hangscer

对于akka系统中的dispatcher的理解

2017/11/04

对于akka系统中的dispatcher的理解

One

        计算机组成原理中,对时间片的解释是:cpu给每个线程分配一个时间段,成为线程的时间片,通过必要的调度算法,达到微观串行,宏观并行的多线程效果。对于akka框架的actor实现,本人的理解是这样的:

actor就像时间片,但是不同的是,这个“片”的度量是该actor处理消息的数量衡量,比如现有配置说每个actor处理完100个消息后,需要把线程资源让出给下一个actor使用(非抢占式调度),这样就实现了actor并发,也就得到了actor:thread是m:n的效果(其中m>n)。(erlang的调度模式似乎更先进。)

        那么以上论点是正确的吗??
        如果观点是正确,那么我们在akka源码中就会类似以下逻辑:

存在某个累计变量(原子性,用于并发),用于记录某个dispatcher或者actor处理多个若干消息的数量,并且还存在对于actor的执行状态的标识。

        首先下载源码,对于此,我唯一了解的事实是这样的逻辑中会使用到叫做throughput的变量,不一定百分之百相同,其他的变量可能包含这个单词,(至于我为什么会想到搜索这个单词呢?在下文会提到的)。


SerializedSuspendableExecutionContext这类中找到了,以下是部分源码:

1
2
3
4
5
6
7
8
9
10
11
private[akka] object SerializedSuspendableExecutionContext {
final val Off = 0
final val On = 1
final val Suspended = 2
def apply(throughput: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext =
new SerializedSuspendableExecutionContext(throughput)(context match {
case s: SerializedSuspendableExecutionContext ⇒ s.context
case other ⇒ other
})
}

以上代码根据throughputexecuteContext创建SerializedSuspendableExecutionContextexecuteContext可以是threadPool也可以是forkjoinPool等几种,用于作为处理邮箱中消息的处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)(val context: ExecutionContext)
extends AbstractNodeQueue[Runnable] with Runnable with ExecutionContext {
import SerializedSuspendableExecutionContext._
require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput")
private final val state = new AtomicInteger(Off)
@tailrec private final def addState(newState: Int): Boolean = {
val c = state.get
state.compareAndSet(c, c | newState) || addState(newState)
}
@tailrec private final def remState(oldState: Int) {
val c = state.get
if (state.compareAndSet(c, c & ~oldState)) attach() else remState(oldState)
}
/**
* Resumes execution of tasks until `suspend` is called,
* if it isn't currently suspended, it is a no-op.
* This operation is idempotent.
*/
final def resume(): Unit = remState(Suspended)
/**
* Suspends execution of tasks until `resume` is called,
* this operation is idempotent.
*/
final def suspend(): Unit = addState(Suspended)
final def run(): Unit = {
@tailrec def run(done: Int): Unit =
if (done < throughput && state.get == On) {
poll() match {
case null ⇒ ()
case some ⇒
try some.run() catch { case NonFatal(t) ⇒ context reportFailure t }
run(done + 1)
}
}
try run(0) finally remState(On)
}
final def attach(): Unit = if (!isEmpty() && state.compareAndSet(Off, On)) context execute this
override final def execute(task: Runnable): Unit = try add(task) finally attach()
override final def reportFailure(t: Throwable): Unit = context reportFailure t
/**
* O(N)
* @return the number of Runnable's currently enqueued
*/
final def size(): Int = count()
override final def toString: String = (state.get: @switch) match {
case 0"Off"
case 1"On"
case 2"Off & Suspended"
case 3"On & Suspended"
}
}

        以上代码可以得到以下信息:SerializedSuspendableExecutionContext继承了AbstractNodeQueue[Runnable],所以内部维护了一个队列,消息本身泛化为Runnable的队列,继承了Runnable,所以内部存在一个run方法体,需要executeContext实例来运行它。
        内部存在OnOffSuspended三种状态标记actor的执行状态,以及与之对应切换的函数。
        run()函数里有个变量done用于累积actor究竟处理了多少个消息,如果多于throughput,那么actor就不会再去处理消息,小于throughput而且状态为On的话,继续从Queue[Runnable]中取出runnable实例执行并且把done+1代入下一次递归。
        attach()用于调用executeContext执行run(),并且判断actor的状态是否初始化正确。
        execute(task)是覆写了ExecuteContext中的方法,从源码中可以看出,会把任务添加到队列中去,但是这个队列只是一个接口,并没有具体实现,所以这个队列可以是FIFO,也可以是优先队列等等。

那么还有哪些问题没有搞清楚?

        比如akka是如何选择actor实例并分配给它们线程资源的,按照何种算法?这些疑惑会搞清楚的。

Two

        MessageDispatcher是akka的核心,所有的MessageDispathcher都实现了一个ExecuteContext。而ExecuteContext可以从线程池(newFixedThreadPool等等ExecuteService)或者fork-joinPool中创建,可以用于执行任意的代码段,比如Future。

1
2
3
4
5
6
7
8
9
10
my-dispatcher{
type=Dispatcher
executor="fork-join-executor"//默认的dispatcher是基于forkjoinPool
fork-join-executor{
parallelism-factor=2.0 //可以理解为cpu的核心的数量
parallelism-min = 2 //每个核心上最小的并发数量
parallelism-max = 10 // 每个核心上最大的并发数量
}
throughput=100 // actor处理100个消息后,thread资源让给其他actor执行
}

有三种dispatcher:

  • Dispatcher,把一系列的actor绑定到特定线程池上。akka默认的dispathcer就是这个。
  • PinnedDispatcher,给每一个actor实例分配一个新的线程池,池中只有一个线程。
  • CallingThreadDispatcher,该dispatcher不会创建任何新的线程,而是利用当前所在的线程,一般用于测试actor。

        正如文章开头提到akka的actor模型是非抢占式调度,换句话说,如果某个actor内部存在阻塞操作,如果采用默认dispatcher配置,那么可能导致actor不并发。
以下代码模拟耗时阻塞操作:

1
2
3
4
5
6
7
class MyActor extend Actor{
override def receive:Receive={
case i:Int =>
Thread.sleep(3000) // blocking the current thread 3 seconds
println("finished operation:"+i)
}
}

        以上代码显然问题多多,很自然的可以想到把阻塞方法用Future封装,不就从阻塞变为非阻塞了嘛,以下是补救措施:

1
2
3
4
5
6
7
8
9
10
class MyActor extend Actor{
implict val ec=context.dispatcher // system.dispatcher
override def receive:Receive={
case i:Int =>
Future{
Thread.sleep(3000) // blocking current thread 3 seconds
println("finished operation:"+i)
}
}
}

        以上代码存在问题:akka系统需要从dispatcher中拿出线程资源来进行actor之间的调度以及协调。可是receive中的Future占用了系统的dispatcher资源,可能把系统的dispatcher给阻塞掉了,由此导致akka系统本身被阻塞。
        解决方案是为包裹了阻塞操作的future分配单独的dispatcher:配置一个专用于处理阻塞操作的dispacther配合代码操作:

1
2
3
4
5
6
7
8
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
1
2
3
4
5
6
7
8
9
10
11
class SeparateDispatcherFutureActor extends Actor {
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("my-blocking-dispatcher")
def receive = {
case i: Int =>
println(s"Calling blocking Future: ${i}")
Future {
Thread.sleep(5000) //block for 5 seconds
println(s"Blocking future finished ${i}")
}
}
}

        以上代码中把Future代码段分配在专门用于处理阻塞操作的dispatcher上去,解决问题😊。