This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 359e08dbafbd7ee8c0ac9acf52925956cf704d55
Author: Patrik Nordwall <[email protected]>
AuthorDate: Wed Jun 19 11:25:28 2019 +0200

    roll 1 - scala
    
    * update to Akka 2.6.x first
    * and add dependeny `akka-cluster-sharding-typed` although the actors 
remain untyped in first rollout
    * define akka.cluster.sharding.number-of-shards config
      * same value as you have used previously for number of shards
      * same value as you will use for Typed
      * otherwise Typed node will not be able to join in roll 2
    * add akka.cluster.sharding.typed.ShardingEnvelope to message extractor
    * sender and ask
      * add replyTo ActorRef to the messages that are using sender for the reply
      * change from `akka.pattern.ask` to `akka.pattern.extended.ask` to 
populate
        the replyTo field in the messages
      * use the replyTo field instead of sender() unless it is null
---
 akka-sample-sharding-scala/build.sbt                  |  1 +
 .../src/main/resources/application.conf               |  3 +++
 .../src/main/scala/sample/sharding/Device.scala       | 10 +++++++---
 .../src/main/scala/sample/sharding/Devices.scala      | 19 ++++++++++++++-----
 4 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/akka-sample-sharding-scala/build.sbt 
b/akka-sample-sharding-scala/build.sbt
index 67ecc9d..f18acde 100644
--- a/akka-sample-sharding-scala/build.sbt
+++ b/akka-sample-sharding-scala/build.sbt
@@ -20,6 +20,7 @@ lazy val `akka-sample-sharding-scala` = project
     javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
       "org.scalatest" %% "scalatest" % "3.0.8" % Test
     ),
diff --git a/akka-sample-sharding-scala/src/main/resources/application.conf 
b/akka-sample-sharding-scala/src/main/resources/application.conf
index 6cdfff3..193471a 100644
--- a/akka-sample-sharding-scala/src/main/resources/application.conf
+++ b/akka-sample-sharding-scala/src/main/resources/application.conf
@@ -23,3 +23,6 @@ akka {
   }
 
 }
+
+akka.cluster.sharding.number-of-shards = 100
+
diff --git 
a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala 
b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
index 19b0488..610f5f4 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Device.scala
@@ -13,7 +13,7 @@ object Device {
   case class RecordTemperature(deviceId: Int, temperature: Double)
       extends Command
 
-  case class GetTemperature(deviceId: Int) extends Command
+  case class GetTemperature(deviceId: Int, replyTo: ActorRef) extends Command
 
   case class Temperature(deviceId: Int,
                          average: Double,
@@ -38,13 +38,17 @@ class Device extends Actor with ActorLogging {
       )
       context.become(counting(temperatures))
 
-    case GetTemperature(id) =>
+    case GetTemperature(id, replyTo) =>
       val reply =
         if (values.isEmpty)
           Temperature(id, Double.NaN, Double.NaN, 0)
         else
           Temperature(id, average(values), values.last, values.size)
-      sender() ! reply
+
+      if (replyTo == null)
+        sender() ! reply
+      else
+        replyTo ! reply
 
   }
 
diff --git 
a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala 
b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
index 5803a76..9d97f92 100644
--- a/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
+++ b/akka-sample-sharding-scala/src/main/scala/sample/sharding/Devices.scala
@@ -5,7 +5,8 @@ import scala.util.Random
 
 import akka.actor._
 import akka.cluster.sharding._
-import akka.pattern.ask
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.pattern.extended.ask // note extended.ask
 import akka.pattern.pipe
 import akka.util.Timeout
 
@@ -24,7 +25,9 @@ class Devices extends Actor with ActorLogging with Timers {
 
   private val extractEntityId: ShardRegion.ExtractEntityId = {
     case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
-    case msg @ Device.GetTemperature(id)       => (id.toString, msg)
+    case msg @ Device.GetTemperature(id, _)    => (id.toString, msg)
+    case ShardingEnvelope(_, msg @ Device.RecordTemperature(id, _)) =>
+      (id.toString, msg)
   }
 
   private val numberOfShards = 100
@@ -32,11 +35,15 @@ class Devices extends Actor with ActorLogging with Timers {
   private val extractShardId: ShardRegion.ExtractShardId = {
     case Device.RecordTemperature(id, _) =>
       (math.abs(id.hashCode) % numberOfShards).toString
-    case Device.GetTemperature(id) =>
+    case Device.GetTemperature(id, _) =>
       (math.abs(id.hashCode) % numberOfShards).toString
     // Needed if you want to use 'remember entities':
     case ShardRegion.StartEntity(id) =>
       (math.abs(id.hashCode) % numberOfShards).toString
+    case ShardingEnvelope(_, Device.RecordTemperature(id, _)) =>
+      (math.abs(id.hashCode) % numberOfShards).toString
+    case ShardingEnvelope(_, Device.GetTemperature(id, _)) =>
+      (math.abs(id.hashCode) % numberOfShards).toString
   }
 
   val deviceRegion: ActorRef = ClusterSharding(context.system).start(
@@ -70,9 +77,11 @@ class Devices extends Actor with ActorLogging with Timers {
         if (deviceId >= 40) {
           import context.dispatcher
           implicit val timeout = Timeout(3.seconds)
-          deviceRegion.ask(Device.GetTemperature(deviceId)).pipeTo(self)
+          deviceRegion
+            .ask(replyTo => Device.GetTemperature(deviceId, replyTo))
+            .pipeTo(self)
         } else
-          deviceRegion ! Device.GetTemperature(deviceId)
+          deviceRegion ! Device.GetTemperature(deviceId, self)
       }
 
     case temp: Device.Temperature =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to