异步消息与同步屏障

Post on Nov 08, 2021 by Wei Lin

异步消息

什么是异步消息?

Android中的消息可以分为同步消息和异步消息。

平时我们经常使用的就是同步消息,同步消息通常会放在MessageQueue的队尾,也可以通过sendMessageAtFrontOfQueue或postAtFrontOfQueue直接放到队列头,但无论如何这些消息进入到MessageQueue都是要排队执行的。所以同步消息由于要排队,会导致靠后的消息会延迟执行,对于一些实时要求高的消息(如屏幕刷新的VSync消息)则影响用户体验。

所以便有了异步消息的机制,如果消息发送后不需要排队就能直接执行,而且还需要阻止其它同步消息执行(即同步屏障,避免postAtFrontOfQueue消息),则可以保证消息的及时执行。

如何发送异步消息?

发送异步消息可以调用Message.setAsynchronous(true),而执行同步屏障需要调用MessageQueue.postSyncBarrier(),但用户无法直接调用MessageQueue的,所以APP通常无法发送异步消息,但实际上可以通过反射来发送异步消息。

如果不执行MessageQueue.postSyncBarrier()则异步消息无效,也会像同步消息一样执行。

同步屏障

关于Android Handler同步屏障那些事—CSDN

同步屏障消息就是在消息队列中插入一个屏障标识的消息(即msg.target==null),在屏障之后的所有普通消息都会被挡着,不能被处理。不过异步消息却例外,屏障不会挡住异步消息,因此可以认为,屏障消息就是为了确保异步消息的优先级,设置了屏障后,只能处理其后的异步消息,同步消息会被挡住,除非撤销屏障。

同步屏障是通过MessageQueue.postSyncBarrier()方法开启的。通过MessageQueue.removeSyncBarrier(token)移除。

代码示例

Android中异步消息和同步屏障—CSDN

Base on: Android 13

Branch: android-13.0.0_r30

Android是不支持用户直接调用MessageQueue.postSyncBarrier()来设置同步屏障的,只有系统才能调用。但可以通过反射的方式调用打开和移除同步屏障的函数。

public void sendAsyncMsg() {
    // 构造同步和异步消息
    Message msg1 = Message.obtain(mMainHanlder, () -> Log.d(TAG, "1s后执行的同步消息!"));
    Message msg2 = Message.obtain(mMainHanlder, () -> Log.d(TAG, "2s后执行的同步消息!"));
    Message msg3 = Message.obtain(mMainHanlder, () -> Log.d(TAG, "3s后执行的异步消息!"));
    Message msg4 = Message.obtain(mMainHanlder, () -> Log.d(TAG, "4s后执行的异步消息!"));
    msg3.setAsynchronous(true);
    msg4.setAsynchronous(true);

    // 发送消息并设置延时执行
    mMainHanlder.sendMessageDelayed(msg1, 1000); // 发送1秒后执行的同步消息
    mMainHanlder.sendMessageDelayed(msg2, 2000); // 发送2秒后执行的同步消息
    mMainHanlder.sendMessageDelayed(msg3, 3000); // 发送3秒后执行的异步消息
    mMainHanlder.sendMessageDelayed(msg4, 4000); // 发送4秒后执行的异步消息

    // 设置同步屏障生效
    int token = 0;
    try {
        Method method = MessageQueue.class.getDeclaredMethod("postSyncBarrier");
        token = (int) method.invoke(Looper.getMainLooper().getQueue());
    } catch (Exception e) {
        e.printStackTrace();
    }
    Log.d(TAG, "Token=" + token);

    // 解除同步屏障
    // 注意:解除同步屏障也必须是要在异步消息的执行代码中中解除,因为同步消息在屏障期间不会被执行
    int finalToken = token; // 从内部类引用的外部变量必须是final的
    Message msg5 = Message.obtain(mMainHanlder, new Runnable() {
        @Override
        public void run() {
            try {
                Method method = MessageQueue.class.getDeclaredMethod("removeSyncBarrier", int.class);
                method.invoke(Looper.getMainLooper().getQueue(), finalToken);
                Log.d(TAG, "移除同步屏障");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    msg5.setAsynchronous(true);
    mMainHanlder.sendMessageDelayed(msg5, 4500);
}

如上代码中,依次向队列中发送1s后和2s后执行的同步消息,以及3s后和4s后执行的异步消息,然后开启同步屏障。

输出为:

Token=28

3s后执行的异步消息!

4s后执行的异步消息!

移除同步屏障

1s后执行的同步消息!

2s后执行的同步消息!

可以发现,之后在移除同步屏障后才会执行同步消息。

同步屏障原理

关于Android Handler同步屏障那些事—CSDN

Android:同步屏障的简单理解和使用—CSDN

Base on: Android 13

Branch: android-13.0.0_r30

原理及注意

(1) 异步消息及同步屏障过程及原理

  • 设置异步消息:调用msg.setAsynchronous(true)将此消息设置为异步消息
  • 开启同步屏障:通过调用MessageQueue.postSyncBarrier()在消息队列头部插入一个Handler target=null的msg。调用postSyncBarrier()会返回一个token,通过该token标识此次同步屏障,并且在同步屏障使用完成之后需要通过该token解除同步屏障,否则同步屏障会一直生效
  • 处理消息:当MessageQueue.next()方法从队列中取出下一个待执行的msg时,如果存在msg.target == null的消息,则说明开启了同步屏障,于是只执行队列中msg.isAsynchronous()==true的异步消息
  • 取消同步屏障:调用MessageQueue.removeSyncBarrier(token)将Handler target=null的msg移除,于是就可以正常执行同步消息了

(2) 注意事项

  • 如果未开启同步屏障,则同步消息和异步消息没什么区别;
  • 同步屏障的标识消息(即target==null的msg)是直接放到队列头的,如果队列头的消息已满足执行条件(msg.when<=curTime),则会将该消息执行完成后再将屏障消息插入,如果有队列中有多个消息满足执行条件,也会将屏障消息放到队列头消息之后;
  • 异步消息也需要按照时间先后顺序执行;
  • 同步屏障的移除需要在异步消息中执行;

MQ.postSyncBarrier()—设置同步屏障

MQ=MessageQueue

调用postSyncBarrier()会返回一个token,通过该token标识此次同步屏障,并且在同步屏障使用完成之后需要通过该token解除同步屏障,否则同步屏障会一直生效。

public int postSyncBarrier() {
    return postSyncBarrier(SystemClock.uptimeMillis());
}

private int postSyncBarrier(long when) {
    // Enqueue a new sync barrier token.
    // We don't need to wake the queue because the purpose of a barrier is to stall it.
    synchronized (this) {
        // token表示屏障标识,解除屏障时需要
        final int token = mNextBarrierToken++;
        // 设置targetnull的同步屏障消息
        // token作为同步屏障标识消息的参数
        final Message msg = Message.obtain();
        msg.markInUse();
        msg.when = when;
        msg.arg1 = token;

        // mMessages队列中的第1个消息
        Message prev = null;
        Message p = mMessages;
        // 如果队列中的第1个消息已到执行时间,则pre=mMessages,p=mMessages.next
        if (when != 0) {
            while (p != null && p.when <= when) {
                prev = p;
                p = p.next;
            }
        }
        if (prev != null) { // invariant: p == prev.next
            // 将msg插入到队列头消息mMessages之后,mMessages.next=msg
            msg.next = p;
            prev.next = msg;
        } else {
            msg.next = p;
            mMessages = msg;
        }
        return token;
    }
}

Message.setAsynchronous()—设置异步消息

将消息设为异步消息实际上就是对该消息添加一个FLAG_ASYNCHRONOUS的flag。之后通过msg.isAsynchronous()判断是否为异步消息。

public void setAsynchronous(boolean async) {
    if (async) {
        flags |= FLAG_ASYNCHRONOUS;
    } else {
        flags &= ~FLAG_ASYNCHRONOUS;
    }
}

public boolean isAsynchronous() {
    return (flags & FLAG_ASYNCHRONOUS) != 0;
}

Handler.enqueueMessage()—异步消息入队

Handler.enqueueMessage()由sendMessageAtTime(msg, time)或sendMessageAtFrontOfQueue(msg)调用而来,只要是发送消息都要通过这两个方法(无论send或post方式),最终通过enqueueMessage()入队。

// 如果发送消息的Handler是mAsynchronous=true,则所有消息都是异步的
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
        long uptimeMillis) {
    msg.target = this;
    msg.workSourceUid = ThreadLocalWorkSource.getUid();

    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}


MessageQueue.enqueueMessage()相关代码
// 用户无法发送msg.target=null的消息
if (msg.target == null) {
    throw new IllegalArgumentException("Message must have a target.");
}


msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
    // 队列空闲,新消息到来
    // New head, wake up the event queue if blocked.
    msg.next = p;
    mMessages = msg;
    needWake = mBlocked;
} else {
    // Inserted within the middle of the queue.  Usually we don't have to wake
    // up the event queue unless there is a barrier at the head of the queue
    // and the message is the earliest asynchronous message in the queue.
    needWake = mBlocked && p.target == null && msg.isAsynchronous();
    Message prev;
    for (;;) {
        prev = p;
        p = p.next;
        if (p == null || when < p.when) {
            break;
        }
        if (needWake && p.isAsynchronous()) {
            needWake = false;
        }
    }
    msg.next = p; // invariant: p == prev.next
    prev.next = msg;
}

MQ.next()—处理异步消息

MQ=MessageQueue

// 调用MQ.next()在消息队列中取出一个消息
private static boolean loopOnce(final Looper me,
        final long ident, final int thresholdOverride) {
    Message msg = me.mQueue.next(); // might block
    if (msg == null) {
        // No message indicates that the message queue is quitting.
        return false;
    }
    //...
}

MessageQueue.next()关键代码如下:

final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 如果当前是一个msg.target == null同步屏障标识
if (msg != null && msg.target == null) {
    // 则在队列中指针向后移动,直到发现异步消息
    // 在接下来的步骤的返回该异步消息
    // Stalled by a barrier.  Find the next asynchronous message in the queue.
    do {
        prevMsg = msg;
        msg = msg.next;
    } while (msg != null && !msg.isAsynchronous());
}

MQ.removeSyncBarrier()—移除同步屏障

public void removeSyncBarrier(int token) {...}

关键代码如下:

Message prev = null;
Message p = mMessages;
// 遍历队列,直到找到同步屏障标识消息
while (p != null && (p.target != null || p.arg1 != token)) {
    prev = p;
    p = p.next;
}
if (p == null) {
    throw new IllegalStateException("The specified message queue synchronization "
            + " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
// 将队列头消息设置到同步屏障消息之后
if (prev != null) {
    prev.next = p.next;
    needWake = false;
} else {
    mMessages = p.next;
    needWake = mMessages == null || mMessages.target != null;
}

注意:同步屏障的移除需要在异步消息中执行。

AOSP中的应用

View更新

在日常开发APP时几乎不会使用到异步消息和同步屏障,而且相关SDK也不支持APP调用(除非使用反射),但在Framework层,有些场景是需要用到同步屏障的。

比如在View更新时,requestLayout()、invalidate()、performTraversals()、draw()等很多方法都会调用到ViewRootImpl.scheduleTraversals()以开启同步屏障。

ViewRootImpl.scheduleTraversals()

首先在ViewRootImpl.scheduleTraversals()中开启同步屏障,再调用Choreographer.postCallback()在之后的步骤发送异步消息。

void scheduleTraversals() {
    if (!mTraversalScheduled) {
        mTraversalScheduled = true;
        mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
        mChoreographer.postCallback(
                Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
        notifyRendererOfFramePending();
        pokeDrawLockIfNeeded();
    }
}

发送异步消息:

ViewRootImpl.scheduleTraversals() ->
Choreographer.postCallback() ->
Choreographer.postCallbackDelayed() ->
Choreographer.postCallbackDelayedInternal() ->
Choreographer.scheduleFrameLocked()

在Choreographer.postCallbackDelayedInternal()或Choreographer.scheduleFrameLocked()中发送异步消息。

private void postCallbackDelayedInternal(int callbackType,
        Object action, Object token, long delayMillis) {

    synchronized (mLock) {
        final long now = SystemClock.uptimeMillis();
        final long dueTime = now + delayMillis;
        mCallbackQueues[callbackType].addCallbackLocked(dueTime, action, token);

        if (dueTime <= now) {
            scheduleFrameLocked(now);
        } else {
            Message msg = mHandler.obtainMessage(MSG_DO_SCHEDULE_CALLBACK, action);
            msg.arg1 = callbackType;
            msg.setAsynchronous(true);
            mHandler.sendMessageAtTime(msg, dueTime);
        }
    }
}

移除同步屏障

添加View时,在ViewRootImpl.doTraversal()中移除同步屏障。

void doTraversal() {
    if (mTraversalScheduled) {
        mTraversalScheduled = false;
        mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);

        if (mProfile) {
            Debug.startMethodTracing("ViewAncestor");
        }

        performTraversals();

        if (mProfile) {
            Debug.stopMethodTracing();
            mProfile = false;
        }
    }
}

调用堆栈:

Handler.dispatchMessage() ->
Handler.handleCallback() ->
Choreographer$FrameDisplayEventReceiver.run() ->
Choreographer.doFrame() ->
Choreographer.doCallbacks() ->
Choreographer$CallbackRecord.run() ->
Choreographer$CallbackRecord.run() ->
ViewRootImpl$TraversalRunnable.run() ->
ViewRootImpl.doTraversal() ->

移除View时,在ViewRootImpl.unscheduleTraversals()中移除同步屏障。

void unscheduleTraversals() {
    if (mTraversalScheduled) {
        mTraversalScheduled = false;
        mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
        mChoreographer.removeCallbacks(
                Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
    }
}