This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch wip-rolling-update-typed-patriknw
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 0ceac9576baa50b67de176af362656bcbfa0dea4
Author: Patrik Nordwall <[email protected]>
AuthorDate: Wed Jun 19 11:35:49 2019 +0200

    roll 2 - scala
    
    * convert actors to Typed
    * use custom ShardingMessageExtractor in ClusterSharding.init
    * use same akka.cluster.sharding.number-of-shards config as was in roll 1
---
 akka-sample-sharding-typed-scala/LICENSE           |  10 ++
 akka-sample-sharding-typed-scala/README.md         |  39 +++++++
 akka-sample-sharding-typed-scala/build.sbt         |  33 ++++++
 .../project/build.properties                       |   1 +
 .../project/plugins.sbt                            |   2 +
 .../src/main/resources/application.conf            |  31 ++++++
 .../src/main/scala/sample/sharding/Device.scala    | 113 +++++++++++++++++++++
 .../src/main/scala/sample/sharding/Devices.scala   |  70 +++++++++++++
 .../src/main/scala/sample/sharding/Message.scala   |   9 ++
 .../main/scala/sample/sharding/ShardingApp.scala   |  29 ++++++
 10 files changed, 337 insertions(+)

diff --git a/akka-sample-sharding-typed-scala/LICENSE 
b/akka-sample-sharding-typed-scala/LICENSE
new file mode 100644
index 0000000..4239f09
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/LICENSE
@@ -0,0 +1,10 @@
+Akka sample by Lightbend
+
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this Template has waived all copyright and related or neighboring
+rights to this Template.
+
+You should have received a copy of the CC0 legalcode along with this
+work.  If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
diff --git a/akka-sample-sharding-typed-scala/README.md 
b/akka-sample-sharding-typed-scala/README.md
new file mode 100644
index 0000000..bbb52b6
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/README.md
@@ -0,0 +1,39 @@
+This tutorial contains a sample illustrating [Akka Cluster 
Sharding](http://doc.akka.io/docs/akka/current/scala/cluster-sharding.html#an-example).
+
+## Example overview
+
+First of all, make sure the correct settings in 
[application.conf](src/main/resources/application.conf) are set as described in 
the akka-sample-cluster tutorial.
+
+Open [ShardingApp.scala](src/main/scala/sample/sharding/ShardingApp.scala).
+
+This small program starts an ActorSystem with Cluster Sharding enabled. It 
joins the cluster and starts a `Devices` actor. This actor starts the 
infrastructure to shard `Device` instances and starts sending messages to 
arbitrary devices.
+
+To run this sample, type `sbt "runMain sample.sharding.ShardingApp"` if it is 
not already started.
+
+`ShardingApp` starts three actor systems (cluster members) in the same JVM 
process. It can be more interesting to run them in separate processes. Stop the 
application and then open three terminal windows.
+
+In the first terminal window, start the first seed node with the following 
command:
+
+    sbt "runMain sample.sharding.ShardingApp 2551"
+
+2551 corresponds to the port of the first seed-nodes element in the 
configuration. In the log output you see that the cluster node has been started 
and changed status to 'Up'.
+
+You'll see a log message when `Devices` sends a message to record the current 
temperature, and for each of those you'll see a log message from the `Device` 
showing the action taken and the new average temperature.
+
+In the second terminal window, start the second seed node with the following 
command:
+
+    sbt "runMain sample.sharding.ShardingApp 2552"
+
+2552 corresponds to the port of the second seed-nodes element in the 
configuration. In the log output you see that the cluster node has been started 
and joins the other seed node and becomes a member of the cluster. Its status 
changed to 'Up'. Switch over to the first terminal window and see in the log 
output that the member joined.
+
+Some of the devices that were originally on the `ActorSystem` on port 2551 
will be migrated to the newly joined `ActorSystem` on port 2552. The migration 
is straightforward: the old actor is stopped and a fresh actor is started on 
the newly created `ActorSystem`. Notice this means the average is reset: if you 
want your state to be persisted you'll need to take care of this yourself. For 
this reason Cluster Sharding and Akka Persistence are such a popular 
combination.
+
+Start another node in the third terminal window with the following command:
+
+    sbt "runMain sample.sharding.ShardingApp 0"
+
+Now you don't need to specify the port number, 0 means that it will use a 
random available port. It joins one of the configured seed nodes. Look at the 
log output in the different terminal windows.
+
+Start even more nodes in the same way, if you like.
+
+Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal 
windows. The other nodes will detect the failure after a while, which you can 
see in the log output in the other terminals.
diff --git a/akka-sample-sharding-typed-scala/build.sbt 
b/akka-sample-sharding-typed-scala/build.sbt
new file mode 100644
index 0000000..55e116b
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/build.sbt
@@ -0,0 +1,33 @@
+import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
+import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
+
+val akkaVersion = "2.6.0-M4"
+
+lazy val `akka-sample-sharding-typed-scala` = project
+  .in(file("."))
+  .settings(multiJvmSettings: _*)
+  .settings(
+    organization := "com.typesafe.akka.samples",
+    scalaVersion := "2.12.8",
+    scalacOptions in Compile ++= Seq(
+      "-deprecation",
+      "-feature",
+      "-unchecked",
+      "-Xlog-reflective-calls",
+      "-Xlint"
+    ),
+    javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
+    javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
+    libraryDependencies ++= Seq(
+      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
+      "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
+      "org.scalatest" %% "scalatest" % "3.0.7" % Test
+    ),
+    mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
+    // disable parallel tests
+    parallelExecution in Test := false,
+    licenses := Seq(
+      ("CC0", url("http://creativecommons.org/publicdomain/zero/1.0";))
+    )
+  )
+  .configs(MultiJvm)
diff --git a/akka-sample-sharding-typed-scala/project/build.properties 
b/akka-sample-sharding-typed-scala/project/build.properties
new file mode 100644
index 0000000..c0bab04
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.2.8
diff --git a/akka-sample-sharding-typed-scala/project/plugins.sbt 
b/akka-sample-sharding-typed-scala/project/plugins.sbt
new file mode 100644
index 0000000..2d02635
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/project/plugins.sbt
@@ -0,0 +1,2 @@
+addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
+addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")
diff --git 
a/akka-sample-sharding-typed-scala/src/main/resources/application.conf 
b/akka-sample-sharding-typed-scala/src/main/resources/application.conf
new file mode 100644
index 0000000..c5e13e4
--- /dev/null
+++ b/akka-sample-sharding-typed-scala/src/main/resources/application.conf
@@ -0,0 +1,31 @@
+akka {
+  loglevel = INFO
+
+  actor {
+    provider = "cluster"
+
+    serialization-bindings {
+      "sample.sharding.Message" = jackson-cbor
+    }
+  }
+
+  # For the sample, just bind to loopback and do not allow access from the 
network
+  # the port is overridden by the logic in main class
+  remote.artery {
+    canonical.port = 0
+    canonical.hostname = 127.0.0.1
+  }
+
+  cluster {
+    seed-nodes = [
+      "akka://[email protected]:2551",
+      "akka://[email protected]:2552"]
+
+    # auto downing is NOT safe for production deployments.
+    # you may want to use it during development, read more about it in the 
docs.
+    auto-down-unreachable-after = 10s
+
+    sharding.number-of-shards = 100
+  }
+
+}
diff --git 
a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
new file mode 100644
index 0000000..6854b4d
--- /dev/null
+++ 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Device.scala
@@ -0,0 +1,113 @@
+package sample.sharding
+
+import akka.actor.typed.ActorRef
+import akka.actor.typed.ActorSystem
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
+import akka.cluster.sharding.typed.HashCodeMessageExtractor
+import akka.cluster.sharding.typed.ShardingEnvelope
+import akka.cluster.sharding.typed.ShardingMessageExtractor
+import akka.cluster.sharding.typed.scaladsl.ClusterSharding
+import akka.cluster.sharding.typed.scaladsl.Entity
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+
+/**
+  * This is just an example: cluster sharding would be overkill for just 
keeping a small amount of data,
+  * but becomes useful when you have a collection of 'heavy' actors (in terms 
of processing or state)
+  * so you need to distribute them across several nodes.
+  */
+object Device {
+  val TypeKey = EntityTypeKey[Device.Command]("Device")
+
+  def init(system: ActorSystem[_]): Unit = {
+
+    val messageExtractor =
+      new ShardingMessageExtractor[Any, Command] {
+
+        // Note that `HashCodeMessageExtractor` is using
+        // `(math.abs(id.hashCode) % numberOfShards).toString`.
+        // If the old Untyped nodes were using a different hashing function
+        // this delegate HashCodeMessageExtractor can't be used and
+        // same hashing function as before must be implemented here.
+        // `akka.cluster.sharding.typed.HashCodeMessageExtractor` is compatible
+        // with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
+        val delegate = new HashCodeMessageExtractor[Device.Command](
+          system.settings.config
+            .getInt("akka.cluster.sharding.number-of-shards")
+        )
+
+        override def entityId(message: Any): String = {
+          message match {
+            case Device.RecordTemperature(deviceId, _) =>
+              deviceId.toString
+            case Device.GetTemperature(deviceId, _) =>
+              deviceId.toString
+            case env: ShardingEnvelope[Device.Command] =>
+              delegate.entityId(env)
+          }
+        }
+
+        override def shardId(entityId: String): String = {
+          delegate.shardId(entityId)
+        }
+
+        override def unwrapMessage(message: Any): Command = {
+          message match {
+            case m: Device.RecordTemperature => m
+            case m: Device.GetTemperature    => m
+            case env: ShardingEnvelope[Device.RecordTemperature] =>
+              delegate.unwrapMessage(env)
+          }
+        }
+      }
+
+    ClusterSharding(system).init(
+      Entity(TypeKey, _ => Device())
+        .withMessageExtractor(messageExtractor)
+    )
+  }
+
+  sealed trait Command extends Message
+
+  case class RecordTemperature(deviceId: Int, temperature: Double)
+      extends Command
+
+  case class GetTemperature(deviceId: Int, replyTo: ActorRef[Temperature])
+      extends Command
+
+  case class Temperature(deviceId: Int,
+                         average: Double,
+                         latest: Double,
+                         readings: Int)
+      extends Message
+
+  def apply(): Behavior[Command] =
+    counting(Vector.empty)
+
+  private def counting(values: Vector[Double]): Behavior[Command] = {
+    Behaviors.receive { (context, cmd) =>
+      cmd match {
+        case RecordTemperature(id, temp) =>
+          val temperatures = values :+ temp
+          context.log.info(
+            s"Recording temperature $temp for device $id, average is 
${average(temperatures)} after " +
+              s"${temperatures.size} readings"
+          )
+          counting(temperatures)
+
+        case GetTemperature(id, replyTo) =>
+          val reply =
+            if (values.isEmpty)
+              Temperature(id, Double.NaN, Double.NaN, 0)
+            else
+              Temperature(id, average(values), values.last, values.size)
+          replyTo ! reply
+          Behaviors.same
+      }
+    }
+  }
+
+  private def average(values: Vector[Double]): Double =
+    if (values.isEmpty) Double.NaN
+    else values.sum / values.size
+}
diff --git 
a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala
new file mode 100644
index 0000000..60c353b
--- /dev/null
+++ 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Devices.scala
@@ -0,0 +1,70 @@
+package sample.sharding
+
+import scala.concurrent.duration._
+import scala.util.Random
+
+import akka.actor.typed.Behavior
+import akka.actor.typed.scaladsl.Behaviors
+import akka.cluster.sharding.typed.scaladsl.ClusterSharding
+
+object Devices {
+  sealed trait Command
+
+  private case object UpdateDevice extends Command
+
+  private case object ReadTemperatures extends Command
+
+  private case class GetTemperatureReply(temp: Device.Temperature)
+      extends Command
+
+  def apply(): Behavior[Command] = {
+    Behaviors.setup { context =>
+      Device.init(context.system)
+      val sharding = ClusterSharding(context.system)
+
+      Behaviors.withTimers { timers =>
+        val random = new Random()
+        val numberOfDevices = 50
+
+        timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second)
+        timers.startTimerWithFixedDelay(
+          ReadTemperatures,
+          ReadTemperatures,
+          15.seconds
+        )
+
+        val temperatureAdapter =
+          context.messageAdapter[Device.Temperature](GetTemperatureReply(_))
+
+        Behaviors.receiveMessage {
+          case UpdateDevice =>
+            val deviceId = random.nextInt(numberOfDevices)
+            val temperature = 5 + 30 * random.nextDouble()
+            val msg = Device.RecordTemperature(deviceId, temperature)
+            context.log.info(s"Sending $msg")
+            sharding.entityRefFor(Device.TypeKey, deviceId.toString) ! msg
+            Behaviors.same
+
+          case ReadTemperatures =>
+            (0 to numberOfDevices).foreach { deviceId =>
+              val entityRef =
+                sharding.entityRefFor(Device.TypeKey, deviceId.toString)
+              entityRef ! Device.GetTemperature(deviceId, temperatureAdapter)
+            }
+            Behaviors.same
+
+          case GetTemperatureReply(temp: Device.Temperature) =>
+            if (temp.readings > 0)
+              context.log.info(
+                "Temperature of device {} is {} with average {} after {} 
readings",
+                temp.deviceId,
+                temp.latest,
+                temp.average,
+                temp.readings
+              )
+            Behaviors.same
+        }
+      }
+    }
+  }
+}
diff --git 
a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala
new file mode 100644
index 0000000..84baedc
--- /dev/null
+++ 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/Message.scala
@@ -0,0 +1,9 @@
+/**
+  * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
+  */
+package sample.sharding
+
+/**
+  * Marker interface for actor messages that are serialized.
+  */
+trait Message
diff --git 
a/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala
 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala
new file mode 100644
index 0000000..cff9352
--- /dev/null
+++ 
b/akka-sample-sharding-typed-scala/src/main/scala/sample/sharding/ShardingApp.scala
@@ -0,0 +1,29 @@
+package sample.sharding
+
+import akka.actor.typed.ActorSystem
+import com.typesafe.config.ConfigFactory
+
+object ShardingApp {
+  def main(args: Array[String]): Unit = {
+    if (args.isEmpty)
+      startup(Seq("2551", "2552", "0"))
+    else
+      startup(args)
+  }
+
+  def startup(ports: Seq[String]): Unit = {
+    // In a production application you wouldn't typically start multiple 
ActorSystem instances in the
+    // same JVM, here we do it to easily demonstrate these ActorSytems (which 
would be in separate JVM's)
+    // talking to each other.
+    ports foreach { port =>
+      // Override the configuration of the port
+      val config = ConfigFactory
+        .parseString("akka.remote.artery.canonical.port=" + port)
+        .withFallback(ConfigFactory.load())
+
+      // Create an Akka system, with Devices actor that starts the sharding 
and sends random messages
+      ActorSystem(Devices(), "ShardingSystem", config)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to