This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 359e08dbafbd7ee8c0ac9acf52925956cf704d55 Author: Patrik Nordwall <[email protected]> AuthorDate: Wed Jun 19 11:25:28 2019 +0200 roll 1 - scala * update to Akka 2.6.x first * and add dependeny `akka-cluster-sharding-typed` although the actors remain untyped in first rollout * define akka.cluster.sharding.number-of-shards config * same value as you have used previously for number of shards * same value as you will use for Typed * otherwise Typed node will not be able to join in roll 2 * add akka.cluster.sharding.typed.ShardingEnvelope to message extractor * sender and ask * add replyTo ActorRef to the messages that are using sender for the reply * change from `akka.pattern.ask` to `akka.pattern.extended.ask` to populate the replyTo field in the messages * use the replyTo field instead of sender() unless it is null --- akka-sample-sharding-scala/build.sbt | 1 + .../src/main/resources/application.conf | 3 +++ .../src/main/scala/sample/sharding/Device.scala | 10 +++++++--- .../src/main/scala/sample/sharding/Devices.scala | 19 ++++++++++++++----- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/akka-sample-sharding-scala/build.sbt b/akka-sample-sharding-scala/build.sbt index 67ecc9d..f18acde 100644 --- a/akka-sample-sharding-scala/build.sbt +++ b/akka-sample-sharding-scala/build.sbt @@ -20,6 +20,7 @@ lazy val `akka-sample-sharding-scala` = project javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"), libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "org.scalatest" %% "scalatest" % "3.0.8" % Test ), diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf b/akka-sample-sharding-scala/src/main/resources/application.conf index 6cdfff3..193471a 100644 --- a/akka-sample-sharding-scala/src/main/resources/application.conf +++ b/akka-sample-sharding-scala/src/main/resources/application.conf @@ -23,3 +23,6 @@ akka { } } + +akka.cluster.sharding.number-of-shards = 100 + diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala index 19b0488..610f5f4 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala @@ -13,7 +13,7 @@ object Device { case class RecordTemperature(deviceId: Int, temperature: Double) extends Command - case class GetTemperature(deviceId: Int) extends Command + case class GetTemperature(deviceId: Int, replyTo: ActorRef) extends Command case class Temperature(deviceId: Int, average: Double, @@ -38,13 +38,17 @@ class Device extends Actor with ActorLogging { ) context.become(counting(temperatures)) - case GetTemperature(id) => + case GetTemperature(id, replyTo) => val reply = if (values.isEmpty) Temperature(id, Double.NaN, Double.NaN, 0) else Temperature(id, average(values), values.last, values.size) - sender() ! reply + + if (replyTo == null) + sender() ! reply + else + replyTo ! reply } diff --git a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala index 5803a76..9d97f92 100644 --- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala +++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala @@ -5,7 +5,8 @@ import scala.util.Random import akka.actor._ import akka.cluster.sharding._ -import akka.pattern.ask +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.pattern.extended.ask // note extended.ask import akka.pattern.pipe import akka.util.Timeout @@ -24,7 +25,9 @@ class Devices extends Actor with ActorLogging with Timers { private val extractEntityId: ShardRegion.ExtractEntityId = { case msg @ Device.RecordTemperature(id, _) => (id.toString, msg) - case msg @ Device.GetTemperature(id) => (id.toString, msg) + case msg @ Device.GetTemperature(id, _) => (id.toString, msg) + case ShardingEnvelope(_, msg @ Device.RecordTemperature(id, _)) => + (id.toString, msg) } private val numberOfShards = 100 @@ -32,11 +35,15 @@ class Devices extends Actor with ActorLogging with Timers { private val extractShardId: ShardRegion.ExtractShardId = { case Device.RecordTemperature(id, _) => (math.abs(id.hashCode) % numberOfShards).toString - case Device.GetTemperature(id) => + case Device.GetTemperature(id, _) => (math.abs(id.hashCode) % numberOfShards).toString // Needed if you want to use 'remember entities': case ShardRegion.StartEntity(id) => (math.abs(id.hashCode) % numberOfShards).toString + case ShardingEnvelope(_, Device.RecordTemperature(id, _)) => + (math.abs(id.hashCode) % numberOfShards).toString + case ShardingEnvelope(_, Device.GetTemperature(id, _)) => + (math.abs(id.hashCode) % numberOfShards).toString } val deviceRegion: ActorRef = ClusterSharding(context.system).start( @@ -70,9 +77,11 @@ class Devices extends Actor with ActorLogging with Timers { if (deviceId >= 40) { import context.dispatcher implicit val timeout = Timeout(3.seconds) - deviceRegion.ask(Device.GetTemperature(deviceId)).pipeTo(self) + deviceRegion + .ask(replyTo => Device.GetTemperature(deviceId, replyTo)) + .pipeTo(self) } else - deviceRegion ! Device.GetTemperature(deviceId) + deviceRegion ! Device.GetTemperature(deviceId, self) } case temp: Device.Temperature => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
