banner
jzman

jzman

Coding、思考、自觉。
github

IjkPlayer系列之消息循环机制

PS: 最近读到一句歌德的一句诗:无论你能做什么,或者梦想做什么,着手开始吧,大胆就是天赋、能量和魔力的代名词。

前面两篇文章中介绍了 JNI 基础知识以及 IjkPlayer 播放器的创建流程:

本文主要内容如下:

  1. AVMessage 和 MessageQueue
  2. 消息队列初始化
  3. 消息循环的启动
  4. 消息循环线程
  5. 消息循环函数
  6. 小结

AVMessage 和 MessageQueue#

先来看看 AVMessageMessageQueue 两个结构体定义:

// ff_ffmsg_queue.h
typedef struct AVMessage {
    int what;
    int arg1;
    int arg2;
    void *obj;
    void (*free_l)(void *obj);
    struct AVMessage *next;
} AVMessage;

typedef struct MessageQueue {
    AVMessage *first_msg, *last_msg;
    int nb_messages;
    int abort_request;
    SDL_mutex *mutex;
    SDL_cond *cond;

    AVMessage *recycle_msg;
    int recycle_count;
    int alloc_count;
} MessageQueue;

AVMessageMessageQueue 的定义和实现都在 ff_ffmsg_queue.h 中,其相关操作函数主要是 msg_xxxmsg_quene_xxx,如下:

// AVMessage
void msg_xxx(AVMessage *msg)
// MessageQueue
void msg_queue_xxx(MessageQueue *q)

MessageQueue 关键函数如下:

// 初始化MessageQueue
void msg_queue_init(MessageQueue *q)
// 重置MessageQueue
void msg_queue_flush(MessageQueue *q)
// q->abort_request设置为0保证消息循环msg_loop能够进行
void msg_queue_start(MessageQueue *q)
// msg_quene_put_xxx系列函数都会调用msg_queue_put_private
int msg_queue_put_private(MessageQueue *q, AVMessage *msg)
// 获取MessageQueue中的第一条消息
int msg_queue_get(MessageQueue *q, AVMessage *msg, int block)

消息队列初始化#

消息队列初始化是在 IjkPlayer 播放器创建过程中初始化的,其关键函数调用如下:

IjkMediaPlayer_native_setup->ijkmp_android_create->ijkmp_create

这里直接从 ijkmp_create 函数开始来看消息队列的初始化。

消息队列对应的是定义在 FFPlayer 中的 msg_queue 成员,在 IjkMediaPlayer 结构体创建过程中会调用函数 ffp_create 初始化 ffplayer,如下:

// ijkplayer.c
IjkMediaPlayer *ijkmp_create(int (*msg_loop)(void*)){
    IjkMediaPlayer *mp = (IjkMediaPlayer *) mallocz(sizeof(IjkMediaPlayer));
    if (!mp)
        goto fail;
    // 创建FFPlayer并初始化mp->ffplayer
    mp->ffplayer = ffp_create();
    // mp->msg_loop
    mp->msg_loop = msg_loop;
    // ...
}

继续查看函数 ffp_create 实现:

// ff_ffplay.c
FFPlayer *ffp_create(){
    // ...
    // 消息队列初始化
    msg_queue_init(&ffp->msg_queue);
    ffp->af_mutex = SDL_CreateMutex();
    ffp->vf_mutex = SDL_CreateMutex();
    // 内部调用msg_queue_flush
    ffp_reset_internal(ffp);
    // ...
    return ffp;
}

在函数 ffp_create 中调用了 msg_queue_init 初始化了消息队列 msg_queue,这里会将 msg_loopabort_request 置为 1,后续启动消息循环线程的时候会将其置为 abort_request 置为 0。

ffp_reset_internal 中内部调用了 msg_queue_flush 重置了 msg_loop,到此消息队列 msg_loop 完成了初始化。

继续往下 mp->msg_loop = msg_loop 完成了消息循环函数的赋值,ijkmp_create 传递进来的消息循环函数是 message_loop,该函数将在后面小节中介绍,到此 message_loop 完成赋值。

函数指针 msg_loop 固然是一个函数,那么 msg_loop什么时候被调用 的呢?

消息循环的启动#

消息循环的开始是 msg_queue_start 函数,其调用流程是从准备播放开始,即应用层调用 prepareAsync 开始准备播放时触发,函数调用流程如下:

Mermaid Loading...

这里看下 ijkmp_prepare_async_l 函数实现:

// ijkplayer.c
static int ijkmp_prepare_async_l(IjkMediaPlayer *mp){
    // ...
    ijkmp_change_state_l(mp, MP_STATE_ASYNC_PREPARING);

    // 开启消息循环
    msg_queue_start(&mp->ffplayer->msg_queue);

    // released in msg_loop
    ijkmp_inc_ref(mp);
    mp->msg_thread = SDL_CreateThreadEx(&mp->_msg_thread, ijkmp_msg_loop, mp, "ff_msg_loop");
    // ...
    return 0;
}

显然调用了 msg_queue_start 开启消息循环,看下 msg_queue_start 的函数实现:

// ff_ffmsg_queue.h
inline static void msg_queue_start(MessageQueue *q){
    SDL_LockMutex(q->mutex);
    // 关键
    q->abort_request = 0;
    AVMessage msg;
    msg_init_msg(&msg);
    msg.what = FFP_MSG_FLUSH;
    msg_queue_put_private(q, &msg);
    SDL_UnlockMutex(q->mutex);
}

这里将 abort_request 置为 0 表示允许消息 AVMessage 入队和出队,不调用该函数则无法完成消息循环,故初始化消息循环 msg_queue 之后还需调用 msg_queue_start 来启动消息循环。

继续看下消息循环获取函数 msg_queue_get 的实现,消息的获取就是通过该函数不断获取通知到应用层的,参数 block 为 1 表示阻塞,0 表示不阻塞,根据调用这里传入的是 1,也就是当消息队列 msg_quene 中没有消息时会等待添加消息后继续处理,如下:

 // ff_ffmsg_queue.h
inline static int msg_queue_get(MessageQueue *q, AVMessage *msg, int block){
    AVMessage *msg1;
    int ret;
    SDL_LockMutex(q->mutex);
    for (;;) {
        // abort
        if (q->abort_request) {
            ret = -1;
            break;
        }
        // 获取队首消息
        msg1 = q->first_msg;
        if (msg1) {// 处理队列中的消息
            q->first_msg = msg1->next;
            if (!q->first_msg)
                q->last_msg = NULL;
            q->nb_messages--;
            *msg = *msg1;
            msg1->obj = NULL;
#ifdef FFP_MERGE
            av_free(msg1);
#else
            msg1->next = q->recycle_msg;
            q->recycle_msg = msg1;
#endif
            ret = 1;
            break;
        } else if (!block) {// 直接退出
            ret = 0;
            break;
        } else {// 阻塞等待
            SDL_CondWait(q->cond, q->mutex);
        }
    }
    SDL_UnlockMutex(q->mutex);
    return ret;
}

上述代码只有 q->abort_request 为 0 才会开始循环获取消息,这也就是为什么要使用 msg_queue_start 来开启消息循环的原因。

这里的开启消息循环只是保证消息能够正常出队入队,但是还是没真正运行消息循环函数 msg_loop

消息循环线程#

记得上文中留一个问题,msg_loop 是什么时候被调用的呢,答案就是 msg_loop 是在消息循环线程中调用的,接着上文继续看下 ijkmp_prepare_async_l,在该函数里面先调用了 msg_queue_start,然后创建了消息循环线程,如下:

// ijkplayer.c
static int ijkmp_prepare_async_l(IjkMediaPlayer *mp){
    // ...
    ijkmp_change_state_l(mp, MP_STATE_ASYNC_PREPARING);
    // 开启消息循环
    msg_queue_start(&mp->ffplayer->msg_queue);
    // released in msg_loop
    ijkmp_inc_ref(mp);
    // 创建消息循环线程
    mp->msg_thread = SDL_CreateThreadEx(&mp->_msg_thread, ijkmp_msg_loop, mp, "ff_msg_loop");
    // ...
    return 0;
}

上述代码中 SDL_CreateThreadEx 创建了线程名为 ff_msg_loop 的线程,线程体运行函数为 ijkmp_msg_loop, 同时 IjkMediaPlayer 结构体的成员 msg_thread 被赋值,当线程创建完成后运行线程体函数 ijkmp_msg_loop,如下:

// ijkplayer.c
static int ijkmp_msg_loop(void *arg){
    IjkMediaPlayer *mp = arg;
    // 调用消息循环函数
    int ret = mp->msg_loop(arg);
    return ret;
}

这里完成了消息循环函数 msg_loop 的调用,到此 IjkPlayer 的消息循环真正启动,下面继续看下消息是如何发送到应用层的。

消息循环函数#

消息循环函数是在播放器创建时,在 IjkMediaPlayer_native_setup 里面传入的,如下:

// ijkplayer_jni.c
static void IjkMediaPlayer_native_setup(JNIEnv *env, jobject thiz, jobject weak_this){
    MPTRACE("%s\n", __func__);
    // 创建C层对应的IjkMediaPlayer
    IjkMediaPlayer *mp = ijkmp_android_create(message_loop);
    // ...
}

message_loop 最终会被赋值给 IjkMediaPlayer 结构体的成员 msg_loop,从前文知道消息循环线程 msg_thread 中调用了消息循环函数 msg_loop,即这里的 message_loop ,其实现如下:

// ijkplayer_jni.c
static int message_loop(void *arg){
    // ...
    IjkMediaPlayer *mp = (IjkMediaPlayer*) arg;
    // 关键函数message_loop_n
    message_loop_n(env, mp);
    // ...
}

继续看下关键函数 message_loop_n 的实现:

// ijkplayer_jni.c
static void message_loop_n(JNIEnv *env, IjkMediaPlayer *mp){
    jobject weak_thiz = (jobject) ijkmp_get_weak_thiz(mp);
    JNI_CHECK_GOTO(weak_thiz, env, NULL, "mpjni: message_loop_n: null weak_thiz", LABEL_RETURN);
    while (1) {
        AVMessage msg;
        // 从MessageQueue获取一个消息AVMessage
        int retval = ijkmp_get_msg(mp, &msg, 1);
        if (retval < 0)
            break;

        // block-get should never return 0
        assert(retval > 0);
        // 处理各种播放事件
        switch (msg.what) {
        case FFP_MSG_PREPARED:
            MPTRACE("FFP_MSG_PREPARED:\n");
            // 关键函数
            post_event(env, weak_thiz, MEDIA_PREPARED, 0, 0);
            break;
        // ...
        default:
            ALOGE("unknown FFP_MSG_xxx(%d)\n", msg.what);
            break;
        }
        // 内存资源回收
        msg_free_res(&msg);
    }
LABEL_RETURN:
    ;
}

可见在消息循环函数中会以死循环的方式通过 ijkmp_get_msg 获取消息,然后通过 post_event 将消息发送给应用层:

// ijkplayer_jni.c
inline static void post_event(JNIEnv *env, jobject weak_this, int what, int arg1, int arg2){
    // postEventFromNative
    J4AC_IjkMediaPlayer__postEventFromNative(env, weak_this, what, arg1, arg2, NULL);
}

函数 post_event 调用 Java 层的 postEventFromNative 方法完成消息的回传,如下:

@CalledByNative
private static void postEventFromNative(Object weakThiz, int what,
        int arg1, int arg2, Object obj) {
    if (weakThiz == null)
        return;

    @SuppressWarnings("rawtypes")
    IjkMediaPlayer mp = (IjkMediaPlayer) ((WeakReference) weakThiz).get();
    if (mp == null) {
        return;
    }

    if (what == MEDIA_INFO && arg1 == MEDIA_INFO_STARTED_AS_NEXT) {
        // this acquires the wakelock if needed, and sets the client side
        // state
        mp.start();
    }
    if (mp.mEventHandler != null) {
        Message m = mp.mEventHandler.obtainMessage(what, arg1, arg2, obj);
        // 发送消息
        mp.mEventHandler.sendMessage(m);
    }
}

postEventFromNative 收到底层 IjkPlayer 发送的消息将其转换成 Message 交给 EventHandler 进行处理,如下:

private static class EventHandler extends Handler {
    private final WeakReference<IjkMediaPlayer> mWeakPlayer;
    public EventHandler(IjkMediaPlayer mp, Looper looper) {
        super(looper);
        mWeakPlayer = new WeakReference<IjkMediaPlayer>(mp);
    }
    @Override
    public void handleMessage(Message msg) {
        IjkMediaPlayer player = mWeakPlayer.get();
        if (player == null || player.mNativeMediaPlayer == 0) {
            DebugLog.w(TAG,
                    "IjkMediaPlayer went away with unhandled events");
            return;
        }
        switch (msg.what) {
        case MEDIA_PREPARED:
            player.notifyOnPrepared();
            return;
        // ...
        default:
            DebugLog.e(TAG, "Unknown message type " + msg.what);
        }
    }
}

根据不同的消息类型进行处理,如上是 MEDIA_PREPARED 事件,最后回调给对应的回调接口,如这里的 mOnPreparedListener,如下:

protected final void notifyOnPrepared() {
    if (mOnPreparedListener != null)
        // 播放准备完成事件
        mOnPreparedListener.onPrepared(this);
}

到此消息循环函数执行完毕。

小结#

本文行文是从 Native 层开始,一直到 Java 层收到消息,IjkPlayer 的消息循环中最重要的就是消息队列 msg_quene,播放器从起播到结束产生的相关事件消息都会添加到该队列中,消息循环线程负责取出消息并通知出去,如果无消息可取,则会阻塞等待添加消息后继续执行消息循环流程。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。