banner
jzman

jzman

Coding、思考、自觉。
github

IjkPlayer系列之消息循环机制

PS: 最近读到一句歌德的一句诗:无论你能做什么,或者梦想做什么,着手开始吧,大胆就是天赋、能量和魔力的代名词。

前面两篇文章中介绍了 JNI 基础知识以及 IjkPlayer 播放器的创建流程:

本文主要内容如下:

  1. AVMessage 和 MessageQueue
  2. 消息队列初始化
  3. 消息循环的启动
  4. 消息循环线程
  5. 消息循环函数
  6. 小结

AVMessage 和 MessageQueue#

先来看看 AVMessageMessageQueue 两个结构体定义:

// ff_ffmsg_queue.h
typedef struct AVMessage {
    int what;
    int arg1;
    int arg2;
    void *obj;
    void (*free_l)(void *obj);
    struct AVMessage *next;
} AVMessage;

typedef struct MessageQueue {
    AVMessage *first_msg, *last_msg;
    int nb_messages;
    int abort_request;
    SDL_mutex *mutex;
    SDL_cond *cond;

    AVMessage *recycle_msg;
    int recycle_count;
    int alloc_count;
} MessageQueue;

AVMessageMessageQueue 的定义和实现都在 ff_ffmsg_queue.h 中,其相关操作函数主要是 msg_xxxmsg_quene_xxx,如下:

// AVMessage
void msg_xxx(AVMessage *msg)
// MessageQueue
void msg_queue_xxx(MessageQueue *q)

MessageQueue 关键函数如下:

// 初始化MessageQueue
void msg_queue_init(MessageQueue *q)
// 重置MessageQueue
void msg_queue_flush(MessageQueue *q)
// q->abort_request设置为0保证消息循环msg_loop能够进行
void msg_queue_start(MessageQueue *q)
// msg_quene_put_xxx系列函数都会调用msg_queue_put_private
int msg_queue_put_private(MessageQueue *q, AVMessage *msg)
// 获取MessageQueue中的第一条消息
int msg_queue_get(MessageQueue *q, AVMessage *msg, int block)

消息队列初始化#

消息队列初始化是在 IjkPlayer 播放器创建过程中初始化的,其关键函数调用如下:

IjkMediaPlayer_native_setup->ijkmp_android_create->ijkmp_create

这里直接从 ijkmp_create 函数开始来看消息队列的初始化。

消息队列对应的是定义在 FFPlayer 中的 msg_queue 成员,在 IjkMediaPlayer 结构体创建过程中会调用函数 ffp_create 初始化 ffplayer,如下:

// ijkplayer.c
IjkMediaPlayer *ijkmp_create(int (*msg_loop)(void*)){
    IjkMediaPlayer *mp = (IjkMediaPlayer *) mallocz(sizeof(IjkMediaPlayer));
    if (!mp)
        goto fail;
    // 创建FFPlayer并初始化mp->ffplayer
    mp->ffplayer = ffp_create();
    // mp->msg_loop
    mp->msg_loop = msg_loop;
    // ...
}

继续查看函数 ffp_create 实现:

// ff_ffplay.c
FFPlayer *ffp_create(){
    // ...
    // 消息队列初始化
    msg_queue_init(&ffp->msg_queue);
    ffp->af_mutex = SDL_CreateMutex();
    ffp->vf_mutex = SDL_CreateMutex();
    // 内部调用msg_queue_flush
    ffp_reset_internal(ffp);
    // ...
    return ffp;
}

在函数 ffp_create 中调用了 msg_queue_init 初始化了消息队列 msg_queue,这里会将 msg_loopabort_request 置为 1,后续启动消息循环线程的时候会将其置为 abort_request 置为 0。

ffp_reset_internal 中内部调用了 msg_queue_flush 重置了 msg_loop,到此消息队列 msg_loop 完成了初始化。

继续往下 mp->msg_loop = msg_loop 完成了消息循环函数的赋值,ijkmp_create 传递进来的消息循环函数是 message_loop,该函数将在后面小节中介绍,到此 message_loop 完成赋值。

函数指针 msg_loop 固然是一个函数,那么 msg_loop什么时候被调用 的呢?

消息循环的启动#

消息循环的开始是 msg_queue_start 函数,其调用流程是从准备播放开始,即应用层调用 prepareAsync 开始准备播放时触发,函数调用流程如下:

Mermaid Loading...

这里看下 ijkmp_prepare_async_l 函数实现:

// ijkplayer.c
static int ijkmp_prepare_async_l(IjkMediaPlayer *mp){
    // ...
    ijkmp_change_state_l(mp, MP_STATE_ASYNC_PREPARING);

    // 开启消息循环
    msg_queue_start(&mp->ffplayer->msg_queue);

    // released in msg_loop
    ijkmp_inc_ref(mp);
    mp->msg_thread = SDL_CreateThreadEx(&mp->_msg_thread, ijkmp_msg_loop, mp, "ff_msg_loop");
    // ...
    return 0;
}

显然调用了 msg_queue_start 开启消息循环,看下 msg_queue_start 的函数实现:

// ff_ffmsg_queue.h
inline static void msg_queue_start(MessageQueue *q){
    SDL_LockMutex(q->mutex);
    // 关键
    q->abort_request = 0;
    AVMessage msg;
    msg_init_msg(&msg);
    msg.what = FFP_MSG_FLUSH;
    msg_queue_put_private(q, &msg);
    SDL_UnlockMutex(q->mutex);
}

这里将 abort_request 置为 0 表示允许消息 AVMessage 入队和出队,不调用该函数则无法完成消息循环,故初始化消息循环 msg_queue 之后还需调用 msg_queue_start 来启动消息循环。

继续看下消息循环获取函数 msg_queue_get 的实现,消息的获取就是通过该函数不断获取通知到应用层的,参数 block 为 1 表示阻塞,0 表示不阻塞,根据调用这里传入的是 1,也就是当消息队列 msg_quene 中没有消息时会等待添加消息后继续处理,如下:

 // ff_ffmsg_queue.h
inline static int msg_queue_get(MessageQueue *q, AVMessage *msg, int block){
    AVMessage *msg1;
    int ret;
    SDL_LockMutex(q->mutex);
    for (;;) {
        // abort
        if (q->abort_request) {
            ret = -1;
            break;
        }
        // 获取队首消息
        msg1 = q->first_msg;
        if (msg1) {// 处理队列中的消息
            q->first_msg = msg1->next;
            if (!q->first_msg)
                q->last_msg = NULL;
            q->nb_messages--;
            *msg = *msg1;
            msg1->obj = NULL;
#ifdef FFP_MERGE
            av_free(msg1);
#else
            msg1->next = q->recycle_msg;
            q->recycle_msg = msg1;
#endif
            ret = 1;
            break;
        } else if (!block) {// 直接退出
            ret = 0;
            break;
        } else {// 阻塞等待
            SDL_CondWait(q->cond, q->mutex);
        }
    }
    SDL_UnlockMutex(q->mutex);
    return ret;
}

上述代码只有 q->abort_request 为 0 才会开始循环获取消息,这也就是为什么要使用 msg_queue_start 来开启消息循环的原因。

这里的开启消息循环只是保证消息能够正常出队入队,但是还是没真正运行消息循环函数 msg_loop

消息循环线程#

记得上文中留一个问题,msg_loop 是什么时候被调用的呢,答案就是 msg_loop 是在消息循环线程中调用的,接着上文继续看下 ijkmp_prepare_async_l,在该函数里面先调用了 msg_queue_start,然后创建了消息循环线程,如下:

// ijkplayer.c
static int ijkmp_prepare_async_l(IjkMediaPlayer *mp){
    // ...
    ijkmp_change_state_l(mp, MP_STATE_ASYNC_PREPARING);
    // 开启消息循环
    msg_queue_start(&mp->ffplayer->msg_queue);
    // released in msg_loop
    ijkmp_inc_ref(mp);
    // 创建消息循环线程
    mp->msg_thread = SDL_CreateThreadEx(&mp->_msg_thread, ijkmp_msg_loop, mp, "ff_msg_loop");
    // ...
    return 0;
}

上述代码中 SDL_CreateThreadEx 创建了线程名为 ff_msg_loop 的线程,线程体运行函数为 ijkmp_msg_loop, 同时 IjkMediaPlayer 结构体的成员 msg_thread 被赋值,当线程创建完成后运行线程体函数 ijkmp_msg_loop,如下:

// ijkplayer.c
static int ijkmp_msg_loop(void *arg){
    IjkMediaPlayer *mp = arg;
    // 调用消息循环函数
    int ret = mp->msg_loop(arg);
    return ret;
}

这里完成了消息循环函数 msg_loop 的调用,到此 IjkPlayer 的消息循环真正启动,下面继续看下消息是如何发送到应用层的。

消息循环函数#

消息循环函数是在播放器创建时,在 IjkMediaPlayer_native_setup 里面传入的,如下:

// ijkplayer_jni.c
static void IjkMediaPlayer_native_setup(JNIEnv *env, jobject thiz, jobject weak_this){
    MPTRACE("%s\n", __func__);
    // 创建C层对应的IjkMediaPlayer
    IjkMediaPlayer *mp = ijkmp_android_create(message_loop);
    // ...
}

message_loop 最终会被赋值给 IjkMediaPlayer 结构体的成员 msg_loop,从前文知道消息循环线程 msg_thread 中调用了消息循环函数 msg_loop,即这里的 message_loop ,其实现如下:

// ijkplayer_jni.c
static int message_loop(void *arg){
    // ...
    IjkMediaPlayer *mp = (IjkMediaPlayer*) arg;
    // 关键函数message_loop_n
    message_loop_n(env, mp);
    // ...
}

继续看下关键函数 message_loop_n 的实现:

// ijkplayer_jni.c
static void message_loop_n(JNIEnv *env, IjkMediaPlayer *mp){
    jobject weak_thiz = (jobject) ijkmp_get_weak_thiz(mp);
    JNI_CHECK_GOTO(weak_thiz, env, NULL, "mpjni: message_loop_n: null weak_thiz", LABEL_RETURN);
    while (1) {
        AVMessage msg;
        // 从MessageQueue获取一个消息AVMessage
        int retval = ijkmp_get_msg(mp, &msg, 1);
        if (retval < 0)
            break;

        // block-get should never return 0
        assert(retval > 0);
        // 处理各种播放事件
        switch (msg.what) {
        case FFP_MSG_PREPARED:
            MPTRACE("FFP_MSG_PREPARED:\n");
            // 关键函数
            post_event(env, weak_thiz, MEDIA_PREPARED, 0, 0);
            break;
        // ...
        default:
            ALOGE("unknown FFP_MSG_xxx(%d)\n", msg.what);
            break;
        }
        // 内存资源回收
        msg_free_res(&msg);
    }
LABEL_RETURN:
    ;
}

可见在消息循环函数中会以死循环的方式通过 ijkmp_get_msg 获取消息,然后通过 post_event 将消息发送给应用层:

// ijkplayer_jni.c
inline static void post_event(JNIEnv *env, jobject weak_this, int what, int arg1, int arg2){
    // postEventFromNative
    J4AC_IjkMediaPlayer__postEventFromNative(env, weak_this, what, arg1, arg2, NULL);
}

函数 post_event 调用 Java 层的 postEventFromNative 方法完成消息的回传,如下:

@CalledByNative
private static void postEventFromNative(Object weakThiz, int what,
        int arg1, int arg2, Object obj) {
    if (weakThiz == null)
        return;

    @SuppressWarnings("rawtypes")
    IjkMediaPlayer mp = (IjkMediaPlayer) ((WeakReference) weakThiz).get();
    if (mp == null) {
        return;
    }

    if (what == MEDIA_INFO && arg1 == MEDIA_INFO_STARTED_AS_NEXT) {
        // this acquires the wakelock if needed, and sets the client side
        // state
        mp.start();
    }
    if (mp.mEventHandler != null) {
        Message m = mp.mEventHandler.obtainMessage(what, arg1, arg2, obj);
        // 发送消息
        mp.mEventHandler.sendMessage(m);
    }
}

postEventFromNative 收到底层 IjkPlayer 发送的消息将其转换成 Message 交给 EventHandler 进行处理,如下:

private static class EventHandler extends Handler {
    private final WeakReference<IjkMediaPlayer> mWeakPlayer;
    public EventHandler(IjkMediaPlayer mp, Looper looper) {
        super(looper);
        mWeakPlayer = new WeakReference<IjkMediaPlayer>(mp);
    }
    @Override
    public void handleMessage(Message msg) {
        IjkMediaPlayer player = mWeakPlayer.get();
        if (player == null || player.mNativeMediaPlayer == 0) {
            DebugLog.w(TAG,
                    "IjkMediaPlayer went away with unhandled events");
            return;
        }
        switch (msg.what) {
        case MEDIA_PREPARED:
            player.notifyOnPrepared();
            return;
        // ...
        default:
            DebugLog.e(TAG, "Unknown message type " + msg.what);
        }
    }
}

根据不同的消息类型进行处理,如上是 MEDIA_PREPARED 事件,最后回调给对应的回调接口,如这里的 mOnPreparedListener,如下:

protected final void notifyOnPrepared() {
    if (mOnPreparedListener != null)
        // 播放准备完成事件
        mOnPreparedListener.onPrepared(this);
}

到此消息循环函数执行完毕。

小结#

本文行文是从 Native 层开始,一直到 Java 层收到消息,IjkPlayer 的消息循环中最重要的就是消息队列 msg_quene,播放器从起播到结束产生的相关事件消息都会添加到该队列中,消息循环线程负责取出消息并通知出去,如果无消息可取,则会阻塞等待添加消息后继续执行消息循环流程。

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.