This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 47bf16a8acd8743e78d8aca170a8f64f334aa582 Author: Patrik Nordwall <[email protected]> AuthorDate: Fri Jul 12 07:37:27 2019 +0200 roll 1 - java * update to Akka 2.6.x first * and add dependeny `akka-cluster-sharding-typed` although the actors remain untyped in first rollout * define akka.cluster.sharding.number-of-shards config * same value as you have used previously for number of shards * same value as you will use for Typed * otherwise Typed node will not be able to join in roll 2 * add akka.cluster.sharding.typed.ShardingEnvelope to message extractor * sender and ask * add replyTo ActorRef to the messages that are using sender for the reply * change from `akka.pattern.Patterns.ask` to `akka.pattern.Patterns.askWithReply` to populate the replyTo field in the messages * use the replyTo field instead of getSender() unless it is null --- akka-sample-sharding-java/build.sbt | 1 + .../src/main/java/sample/sharding/Device.java | 17 ++++++++++++----- .../src/main/java/sample/sharding/Devices.java | 15 ++++++++++++--- .../src/main/resources/application.conf | 3 +++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/akka-sample-sharding-java/build.sbt b/akka-sample-sharding-java/build.sbt index e1aa209..5b4f742 100644 --- a/akka-sample-sharding-java/build.sbt +++ b/akka-sample-sharding-java/build.sbt @@ -20,6 +20,7 @@ val `akka-sample-sharding-java` = project javacOptions in doc in Compile := Seq("-parameters", "-Xdoclint:none"), libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "org.scalatest" %% "scalatest" % "3.0.7" % Test ), diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java index 7d6a717..8d722d8 100644 --- a/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java +++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Device.java @@ -32,10 +32,12 @@ public class Device extends AbstractActor { public static class GetTemperature implements Command { public final int deviceId; + public final ActorRef replyTo; @JsonCreator - public GetTemperature(int deviceId) { + public GetTemperature(int deviceId, ActorRef replyTo) { this.deviceId = deviceId; + this.replyTo = replyTo; } } @@ -77,13 +79,18 @@ public class Device extends AbstractActor { } private void receiveGetTemperature(GetTemperature cmd) { + Temperature reply; if (temperatures.isEmpty()) { - getSender().tell(new Temperature(cmd.deviceId, Double.NaN, - Double.NaN, 0), getSelf()); + reply = new Temperature(cmd.deviceId, Double.NaN, Double.NaN, 0); } else { - getSender().tell(new Temperature(cmd.deviceId, average(temperatures), - temperatures.get(temperatures.size() - 1), temperatures.size()), getSelf()); + reply = new Temperature(cmd.deviceId, average(temperatures), + temperatures.get(temperatures.size() - 1), temperatures.size()); } + + if (cmd.replyTo == null) + getSender().tell(reply, getSelf()); + else + cmd.replyTo.tell(reply, getSelf()); } private double sum(List<Double> values) { diff --git a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java index 2ada983..72d4e00 100644 --- a/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java +++ b/akka-sample-sharding-java/src/main/java/sample/sharding/Devices.java @@ -14,6 +14,7 @@ import akka.event.LoggingAdapter; import akka.cluster.sharding.ClusterSharding; import akka.cluster.sharding.ClusterShardingSettings; import akka.cluster.sharding.ShardRegion; +import akka.cluster.sharding.typed.ShardingEnvelope; import akka.pattern.Patterns; public class Devices extends AbstractActorWithTimers { @@ -33,7 +34,13 @@ public class Devices extends AbstractActorWithTimers { return String.valueOf(((Device.RecordTemperature) message).deviceId); else if (message instanceof Device.GetTemperature) return String.valueOf(((Device.GetTemperature) message).deviceId); - else + else if (message instanceof ShardingEnvelope) { + ShardingEnvelope envelope = (ShardingEnvelope) message; + if (envelope.message() instanceof Device.RecordTemperature) + return String.valueOf(((Device.RecordTemperature) envelope.message()).deviceId); + else + return null; + } else return null; } }; @@ -88,10 +95,12 @@ public class Devices extends AbstractActorWithTimers { private void receiveReadTemperatures() { for (int deviceId = 0; deviceId < numberOfDevices; deviceId++) { if (deviceId >= 40) { - CompletionStage<Object> reply = Patterns.ask(deviceRegion, new Device.GetTemperature(deviceId), Duration.ofSeconds(3)); + final int id = deviceId; + CompletionStage<Object> reply = Patterns.askWithReplyTo(deviceRegion, replyTo -> + new Device.GetTemperature(id, replyTo), Duration.ofSeconds(3)); Patterns.pipe(reply, getContext().getDispatcher()).to(getSelf()); } else { - deviceRegion.tell(new Device.GetTemperature(deviceId), getSelf()); + deviceRegion.tell(new Device.GetTemperature(deviceId, getSelf()), getSelf()); } } } diff --git a/akka-sample-sharding-java/src/main/resources/application.conf b/akka-sample-sharding-java/src/main/resources/application.conf index 903f3af..8af5c0b 100644 --- a/akka-sample-sharding-java/src/main/resources/application.conf +++ b/akka-sample-sharding-java/src/main/resources/application.conf @@ -27,3 +27,6 @@ akka { } } + +akka.cluster.sharding.number-of-shards = 100 + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
