hangscer

Akka Cluster简介

2017/12/23

  akka集群是高容错、去中心化、不存在单点故障以及不存在单点瓶颈的集群。它使用gossip协议通信以及具备故障自动检测功能。

Gossip收敛
  集群中每一个节点被其他节点监督(默认的最大数量为5)。集群中的节点互相监督着,某节点所监督的状态也正在被其他监督着。通过gossip协议,节点向其他节点传递自己所见节点的最新状态(Up、Joining等等),同时节点也在接收来自其他节点的信息,这些信息包括哪些节点以及这些节点对应的状态,并这些节点加入到自己的seen表里去,表示自己已经看见了这些节点的最新状态了,当所有的节点都把其他节点“看见”了后,我们可以说”Gossip收敛”完成了。

  根据以上陈述,当集群中某节点不可达(unreachable)时,gossip收敛不能完成。那些不可达的节点需要变成可达状态(reachable)或者down状态,收敛才能进行。
  akka集群不存在leader选举,但是存在leader节点,但是leader节点可以转移,leader负责执行leader action,当每次收敛完成后,leader需要做三件事:

  • 将处于joining状态节点变更为Up状态, 即joining->up
  • leaving->exiting
  • exiting->removed

failure Detector

  集群中,一个节点被其他节点监督(默认最大数量为5),任何一个节点被探测到不可达时,那么这个消息将被通过gossip协议传播到其他节点去,其他节点也将此节点标为不可达。同时故障检测机制也会将节点从不可达标记为可达,同时扩散给其他节点。
关于评判一个节点是否可达的方式是利用历史数据中每次心跳时间间隔的平均值与心跳次数为均方差去构建一个正太分布,F是这个分布的密度分布函数,利用以下公式:

phi = -log10(1 - F(timeSinceLastHeartbeat))

  phi反应了当前网络的好坏情况,当akka.cluster.failure-detector.threshold阈值配置不当时,并不是等待某个心跳检测超时时,才会把节点标记为不可达。其值默认为18,想要得到更高的灵敏度,需要把阈值设置降低。

实践

  编程方式构建集群
  akka.tcp://myCluster@127.0.0.1:2551节点:

application.conf:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
akka {
actor {
provider = cluster
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = []
}
}
1
2
3
4
5
6
7
8
9
10
package nathan
import akka.actor.{Actor, ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory
object Main extends App {
val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
Cluster(actorSystem).join(Address(protocol = "akka.tcp",system = "myCluster",host = "127.0.0.1",port = 2551))
}

  上述代码Cluster(actorSystem).join(address)是以address为基础创建集群,集群的名称为”myCluster”,其中包含”akka.tcp://myCluster@127.0.0.1:2551”的节点。集群的名称为其第一个加入的节点的名字决定,其他后加入的节点的名称应当与其保持一致。当这个单节点集群创建完毕后,这个单节点就成为seedNode,也就是说,其他节点通过向种子节点发出Join指令,就可以加入集群。

  akka.tcp://myCluster@127.0.0.1:2552节点
application.conf

1
2
3
4
5
6
7
8
9
10
11
12
akka {
actor {
provider = cluster
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
}
1
2
3
4
5
6
7
8
9
10
package nathan
import akka.actor.{ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory
object Main extends App {
val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
Cluster(actorSystem).joinSeedNodes(List(Address(protocol = "akka.tcp",system = "myCluster1",host = "127.0.0.1",port = 2551)))
}

  Cluster(actorSystem).joinSeedNodes(List(address))代码作用向某个种子节点发出Join命令以加入集群。这里填写的种子节点越多越好,这样消息在集群中扩散可以更快。

监听集群节点状态

  集群时间有如下几种有如下几种:MemberJoinedMemberWeaklyUpMemberUpMemberLeftMemberExitedMemberRemovedLeaderChangedRoleLeaderChangedUnreachableMemberReachableMember等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ListenClusterActor extends Actor {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self)
override def receive: Receive = {
case MemberJoined(member) =>
println("join:" + member)
case MemberUp(member) =>
println("up:" + member)
case MemberExited(member) =>
println("exited:" + member)
case MemberRemoved(member,previousStatus) =>
println("removed:" + member+" before status:"+previousStatus)
case UnreachableMember(member) =>
println("unreachable:" + member)
}
}

当其他节点加入集群时和离开时,打印如下:

1
2
3
4
join:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Joining)
up:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Up)
exited:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Exiting)
removed:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Removed) before status:Exiting