博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
akka cluster 初体验
阅读量:6889 次
发布时间:2019-06-27

本文共 3046 字,大约阅读时间需要 10 分钟。

cluster 配置

akka {  actor {    provider = "akka.cluster.ClusterActorRefProvider"  }  remote {    log-remote-lifecycle-events = off    enabled-transports = ["akka.remote.netty.tcp"]    netty.tcp {      hostname = "127.0.0.1"      port = 0    }  }  cluster {    seed-nodes = [      "akka.tcp://ClusterSystem@127.0.0.1:2551",      "akka.tcp://ClusterSystem@127.0.0.1:2552"]    auto-down-unreachable-after = 10s  }  persistence {    journal.plugin = "akka.persistence.journal.leveldb-shared"    journal.leveldb-shared.store {      # DO NOT USE 'native = off' IN PRODUCTION !!!      native = off      dir = "target/shared-journal"    }    snapshot-store.local.dir = "target/snapshots"  }  log-dead-letters = off}

  

actor.provider 设定选取 clusterActorRefProvider,在 IDE 中该 String 可以跳转到 ClusterActorRefProvider,从程序的注释来看,actorRef provider 其实并不是说 actor 是怎么提供的,它是为了引入 cluster extension,并自动启动 cluster 

i.e. the cluster will automatically be started when the 'ClusterActorRefProvider' is used.

 

创建三个 actorSystem 组成 cluster

def main(args: Array[String]): Unit = {    if (args.isEmpty)      startup(Seq("2551", "2552", "0"))    else      startup(args)  }  def startup(ports: Seq[String]): Unit = {    ports foreach { port =>      // Override the configuration of the port      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).        withFallback(ConfigFactory.load())      // Create an Akka system      val system = ActorSystem("ClusterSystem", config)      system.actorOf(ClusterSingletonManager.props(Master.props(Duration(99, "second")), "active",        PoisonPill, None), "master")    }  }

  

创建 actorSystem 时的 config 重写了 akka.remote.netty.tcp.port,因为默认的配置只有 port = 0 这个选项。因为三个 cluster 都在本机启动,所以 hostname 不需要额外声明,重用 application.conf 中的 127.0.0.1 

 

另外,ActorSystem 的名字必须统一,都是 ClusterSystem,这是在 application.conf 中的 seed-nodes 中声明的,它是 cluster 的 id。

我猜,当 seed-nodes 都挂掉了,新的 actorSystem 应该就无法加入 cluster 了,因为光靠 cluster id 已经无法找到组织了

 

 

object Master {  val ResultsTopic = "results"  def props(workTimeout: FiniteDuration): Props =    Props(classOf[Master], workTimeout)  case object Job  case object ParentGreetings}class Master(workTimeout: FiniteDuration) extends Actor with ActorLogging {  context.system.scheduler.schedule(Duration(10, "second"), Duration(5, "second"), self, Job)  val timeout: Timeout = 10 second  override def receive: Receive = {    case Job =>      val info = ClusterProtocol.selfInfo(self)      log.info(info._1 + ": " + info._2 + ": " + info._3)      context.system.actorSelection("user/master/active").resolveOne(4 second).map(actor => actor ! ParentGreetings)    case ParentGreetings =>      log.info("greetings from parent")  }}

  

上面是 cluster singleton actor 的实现,它有两件事要做,一是打印出自己的路径,而是找到自己。需要注意的是,resolveOne 不能 await(阻塞式的等),否者会报异常,actor not found

 

上面的代码实现了一个简单的 cluster singleton,具体的表现是,当一个 cluster 有多个 actorSystem 时,当一个 actorSystem 挂掉时,master actor 会继续提供服务,且此 actor 的 instance 有且只有一个。

 

sbt "run-main packageName.mainName portNum" 可以启动 actorSystem 在端口 portNum 上,不加 端口号,会一下启动三个 actorSystem,但是不方便模拟一个 actorSystem 挂掉的情况。

转载地址:http://qqtbl.baihongyu.com/

你可能感兴趣的文章
windows下重装xampp并做mysql数据迁移的步骤
查看>>
Java日志组件间关系
查看>>
聊聊前端国际化文案该如何处理
查看>>
JS难点之hoist
查看>>
“独角兽”企业都爱选择腾讯云,背后原因值得考究
查看>>
浅析 Vue 2.6 中的 nextTick 方法
查看>>
199. Binary Tree Right Side View
查看>>
配置SpringBoot方便的切换jar和war
查看>>
2018最佳GAN论文回顾(下)
查看>>
Vue使用element-ui所遇BUG与需求集结(二)
查看>>
弹性公网EIP,让网络更自由、灵活
查看>>
一对一直播源码都实现了哪几种常见的优化技术? ...
查看>>
Unity学习系列一简介
查看>>
利用Python框架pyxxnet_project实现的网络服务
查看>>
一个最简单的WebSocket hello world demo
查看>>
C# 8.0的三个令人兴奋的新特性
查看>>
关于ip_conntrack跟踪连接满导致网络丢包问题的分析
查看>>
烂泥:linux学习之VNC远程控制(一)
查看>>
如何解决Xshell使用时中文字体是躺倒显示的问题
查看>>
Scala函数的定义的几种写法
查看>>