This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 47bf16a8acd8743e78d8aca170a8f64f334aa582
Author: Patrik Nordwall <[email protected]>
AuthorDate: Fri Jul 12 07:37:27 2019 +0200

    roll 1 - java
    
    * update to Akka 2.6.x first
    * and add dependeny `akka-cluster-sharding-typed` although the actors 
remain untyped in first rollout
    * define akka.cluster.sharding.number-of-shards config
      * same value as you have used previously for number of shards
      * same value as you will use for Typed
      * otherwise Typed node will not be able to join in roll 2
    * add akka.cluster.sharding.typed.ShardingEnvelope to message extractor
    * sender and ask
      * add replyTo ActorRef to the messages that are using sender for the reply
      * change from `akka.pattern.Patterns.ask` to 
`akka.pattern.Patterns.askWithReply` to populate
        the replyTo field in the messages
      * use the replyTo field instead of getSender() unless it is null
---
 akka-sample-sharding-java/build.sbt                     |  1 +
 .../src/main/java/sample/sharding/Device.java           | 17 ++++++++++++-----
 .../src/main/java/sample/sharding/Devices.java          | 15 ++++++++++++---
 .../src/main/resources/application.conf                 |  3 +++
 4 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/akka-sample-sharding-java/build.sbt 
b/akka-sample-sharding-java/build.sbt
index e1aa209..5b4f742 100644
--- a/akka-sample-sharding-java/build.sbt
+++ b/akka-sample-sharding-java/build.sbt
@@ -20,6 +20,7 @@ val `akka-sample-sharding-java` = project
     javacOptions in doc in Compile := Seq("-parameters", "-Xdoclint:none"),
     libraryDependencies ++= Seq(
       "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
       "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
       "org.scalatest" %% "scalatest" % "3.0.7" % Test
     ),
diff --git 
a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java 
b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java
index 7d6a717..8d722d8 100644
--- a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java
+++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java
@@ -32,10 +32,12 @@ public class Device extends AbstractActor {
 
   public static class GetTemperature implements Command {
     public final int deviceId;
+    public final ActorRef replyTo;
 
     @JsonCreator
-    public GetTemperature(int deviceId) {
+    public GetTemperature(int deviceId, ActorRef replyTo) {
       this.deviceId = deviceId;
+      this.replyTo = replyTo;
     }
   }
 
@@ -77,13 +79,18 @@ public class Device extends AbstractActor {
   }
 
   private void receiveGetTemperature(GetTemperature cmd) {
+    Temperature reply;
     if (temperatures.isEmpty()) {
-      getSender().tell(new Temperature(cmd.deviceId, Double.NaN,
-        Double.NaN, 0), getSelf());
+      reply = new Temperature(cmd.deviceId, Double.NaN, Double.NaN, 0);
     } else {
-      getSender().tell(new Temperature(cmd.deviceId, average(temperatures),
-        temperatures.get(temperatures.size() - 1), temperatures.size()), 
getSelf());
+      reply = new Temperature(cmd.deviceId, average(temperatures),
+        temperatures.get(temperatures.size() - 1), temperatures.size());
     }
+
+    if (cmd.replyTo == null)
+      getSender().tell(reply, getSelf());
+    else
+      cmd.replyTo.tell(reply, getSelf());
   }
 
   private double sum(List<Double> values) {
diff --git 
a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java 
b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
index 2ada983..72d4e00 100644
--- a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
+++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java
@@ -14,6 +14,7 @@ import akka.event.LoggingAdapter;
 import akka.cluster.sharding.ClusterSharding;
 import akka.cluster.sharding.ClusterShardingSettings;
 import akka.cluster.sharding.ShardRegion;
+import akka.cluster.sharding.typed.ShardingEnvelope;
 import akka.pattern.Patterns;
 
 public class Devices extends AbstractActorWithTimers {
@@ -33,7 +34,13 @@ public class Devices extends AbstractActorWithTimers {
         return String.valueOf(((Device.RecordTemperature) message).deviceId);
       else if (message instanceof Device.GetTemperature)
         return String.valueOf(((Device.GetTemperature) message).deviceId);
-      else
+      else if (message instanceof ShardingEnvelope) {
+        ShardingEnvelope envelope = (ShardingEnvelope) message;
+        if (envelope.message() instanceof Device.RecordTemperature)
+          return String.valueOf(((Device.RecordTemperature) 
envelope.message()).deviceId);
+        else
+          return null;
+      } else
         return null;
     }
   };
@@ -88,10 +95,12 @@ public class Devices extends AbstractActorWithTimers {
   private void receiveReadTemperatures() {
     for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) {
       if (deviceId >= 40) {
-        CompletionStage<Object> reply = Patterns.ask(deviceRegion, new 
Device.GetTemperature(deviceId), Duration.ofSeconds(3));
+        final int id = deviceId;
+        CompletionStage<Object> reply = Patterns.askWithReplyTo(deviceRegion, 
replyTo ->
+          new Device.GetTemperature(id, replyTo), Duration.ofSeconds(3));
         Patterns.pipe(reply, getContext().getDispatcher()).to(getSelf());
       } else {
-        deviceRegion.tell(new Device.GetTemperature(deviceId), getSelf());
+        deviceRegion.tell(new Device.GetTemperature(deviceId, getSelf()), 
getSelf());
       }
     }
   }
diff --git a/akka-sample-sharding-java/src/main/resources/application.conf 
b/akka-sample-sharding-java/src/main/resources/application.conf
index 903f3af..8af5c0b 100644
--- a/akka-sample-sharding-java/src/main/resources/application.conf
+++ b/akka-sample-sharding-java/src/main/resources/application.conf
@@ -27,3 +27,6 @@ akka {
   }
 
 }
+
+akka.cluster.sharding.number-of-shards = 100
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to