hangscer

Akka中的Message Stash

2018/02/24

    Akka是JVM平台上构建高并发、分布式以及高度容错应用的工具包,其基于Actor模型实现了m:n的线程模式(m大于n,m是actor实例的个数,n是线程数量)。akka程序中每个actor实例都扮演一种角色或者实现某一个功能,每一个actor实例都有对应的消息邮箱,actor从自己的邮箱中消费消息,actor之间通过向对方的邮箱发送消息互相交流。常常遇到这样一种情形:某个actor需要在A和B两种状态下切换,以及分别在每种状态下接收并处理各自对应的信息Messsage-A类型和Messsage-B类型的消息实例。由于当处于A状态时,仍然会收到本该处于B状态时处理的Messsage-B类型的消息实例;当处于B状态时,亦是如此。
    解决思路是把当前无法处理的消息暂存(stash),在切换对应的状态时把所有的暂存的消息添加到消息队列最前方,这些被释放出来的消息仍然按照它们被接收到顺序依次处理。

简单应用

    实现一个akka集群监控功能(AkkaClusterHealthActor),等待http请求,在响应请求时向集群中所有其他节点请求健康状态,等待所有节点全部返回健康信息,汇总信息后,再完成http请求,最后回到最初状态:等待http请求。

编码分析

    以上问题可以大致分为两种状态:等待http请求、等待其他节点的健康状态,但是在等待其他节点健康状态时,仍会收到http请求,需要暂存http请求,稍后释放做进一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object SomeApp {
..... //程序初始化 ActorSystem、ExecutorService ....
val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor")
...
def main(args: Array[String]): Unit = {
...
val router =
path("healthState"){ //akka-http dsl
get{
imperativelyComplete {
httpCtx =>
clusterhealthActor ! httpCtx //向该actor发送http请求实例
}
}
}
...
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class AkkaClusterHealthActor extends Actor with Stash {
val cluster = Cluster(context.system) //获取集群状态信息,包括节点的列表、地址、状态等等
override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化时
override def receive: Receive = {
case _ =>
}
def waitingForHttpRequest: Receive = {
case httpCtx: ImperativeRequestContext => //等待http请求
val members = cluster.state.members.toList //包含自身节点
members.foreach { member =>
context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //请求集群的节点健康状态 ,member.address akka.tcp://itoa@127.0.0.1:2551
}
context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部变量的产生,members.length 需要等待几个节点的返回,有可能超时
context.setReceiveTimeout(3 seconds)
}
def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集群个数 用于接收到的信息个数
case nodeStateResult: StateResult =>
(memberNum - 1) match {
case 0 => //完成任务 取消超时设置
context.setReceiveTimeout(Duration.Inf)
ctx.complete(nodeStateResult)
context.become(waitingForHttpRequest)
unstashAll() //完成该请求,处理其他请求
case _ => //仍有部分节点没有返回健康状态结果,继续等待
context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //这里的代码有点类似于 `尾递归`
}
case ReceiveTimeout =>
context.setReceiveTimeout(Duration.Inf) //完成任务 取消超时设置
ctx.complete(results) //返回部分结果,完成http请求
context.become(waitingForHttpRequest)
unstashAll() //完成该次请求,释放暂存的http请求,处理其他请求
case _ =>
stash() //处理某个请求中,但是收到了其他http请求,暂存消息
}
}

    以上代码大致实现了具体逻辑,actor在waitingForHttpRequestwaitingForAkkaNode两种状态之间来回切换,让我们来看看akka中关于该功能的源码。

1
2
...
trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]

    重启之前(并不是重启之后)应当释放所有暂存的消息,同样的,而且需要在关闭之后释放暂存的消息。

1
2
3
4
5
6
7
...
trait UnrestrictedStash extends Actor with StashSupport {
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try unstashAll() finally super.preRestart(reason, message)
}
override def postStop(): Unit = try unstashAll() finally super.postStop()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private[akka] trait StashSupport {
private[akka] def context: ActorContext
private[akka] def self: ActorRef
//基于`scala.collection.immutable.Vector`,当存储的消息数量很大时,也可以获得很好的性能
private var theStash = Vector.empty[Envelope]
// ActorContext是ActorCell的子类
private def actorCell = context.asInstanceOf[ActorCell]
//可为暂存的消息设置数量限制,通过`stash-capacity`配置,默认为-1,即容量不限
private val capacity: Int =
context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox)
private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
actorCell.mailbox.messageQueue match {
case queue: DequeBasedMessageQueueSemantics ⇒ queue
case other ⇒ throw ActorInitializationException
}
}
//1. 获取当前的消息,不能将同一个消息存两次(不能调用两次`stash()`函数,`eq`用于比较对象之间的内存地址)
//2. capacity默认-1,存储容量不限
//3. 暂存的消息被添加到`theStash`集合的头部(遍历或者迭代时,从尾向头遍历)
def stash(): Unit = {
val currMsg = actorCell.currentMessage
if (theStash.nonEmpty && (currMsg eq theStash.last))
throw new IllegalStateException(s"Can't stash the same message $currMsg more than once")
if (capacity <= 0 || theStash.size < capacity)
theStash = theStash :+ currMsg
else throw new StashOverflowException
}
//当`others`很小时,而`theStash`很大,该方法很高效
//从头向尾依次将`others`中元素添加到`theStash`尾
private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
...
//`reverseIterator`反序输出`theStash`中的每一封邮件,依次添加到邮箱的头部
def unstashAll(): Unit = {
try {
val i = theStash.reverseIterator
while (i.hasNext) enqueueFirst(i.next())
} finally {
theStash = Vector.empty[Envelope]
}
}
...
//把该封信放在邮箱的第一个位置
private def enqueueFirst(envelope: Envelope): Unit = {
mailbox.enqueueFirst(self, envelope)
envelope.message match {
case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref)
case _ ⇒
}
}
}

    基于scala.collection.immutable.Vector的实现,在存储数量很大时,仍然可以获得很好的性能。
    同一个消息不能被暂存两次,否则程序抛出IllegalStateException
    theStash = theStash :+ currMsgtheStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)val i = theStash.reverseIterator这几句关于集合操作与迭代的代码确保了:这些被释放出来的消息依然按照它们被接收到顺序依次处理,先来后到规则要遵守。