This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-chbatey-cqrs-testing in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 3cf2ab6fd4bf3850288119f1256b7bf8a6877e57 Author: Christopher Batey <[email protected]> AuthorDate: Fri Oct 2 15:10:52 2020 +0100 Basic testing --- akka-sample-cqrs-scala/README.md | 66 +------ akka-sample-cqrs-scala/build.sbt | 14 +- akka-sample-cqrs-scala/projection.sql | 21 ++ .../src/main/resources/application.conf | 43 +++-- .../projection/testing}/CborSerializable.scala | 2 +- .../testing/ConfigurablePersistentActor.scala | 47 +++++ .../testing}/EventProcessorSettings.scala | 7 +- .../scala/akka/projection/testing/Guardian.scala | 87 +++++++++ .../akka/projection/testing/HikariFactory.scala | 9 + .../projection/testing/HikariJdbcSession.scala | 23 +++ .../projection/testing/HttpServer.scala} | 15 +- .../akka/projection/testing/LoadGeneration.scala | 85 ++++++++ .../main/scala/akka/projection/testing/Main.scala | 33 ++++ .../projection/testing/ProjectionHandler.scala | 19 ++ .../scala/akka/projection/testing/TestRoutes.scala | 30 +++ .../akka/projection/testing/TestValidation.scala | 45 +++++ .../src/main/scala/sample/cqrs/Main.scala | 153 --------------- .../src/main/scala/sample/cqrs/ShoppingCart.scala | 215 --------------------- .../cqrs/ShoppingCartProjectionHandler.scala | 27 --- .../scala/sample/cqrs/ShoppingCartRoutes.scala | 110 ----------- .../src/test/resources/logback-test.xml | 17 -- .../test/scala/sample/cqrs/IntegrationSpec.scala | 198 ------------------- .../test/scala/sample/cqrs/ProjectionSpec.scala | 87 --------- .../test/scala/sample/cqrs/ShoppingCartSpec.scala | 86 --------- 24 files changed, 450 insertions(+), 989 deletions(-) diff --git a/akka-sample-cqrs-scala/README.md b/akka-sample-cqrs-scala/README.md index 77ec65b..52ee6d2 100644 --- a/akka-sample-cqrs-scala/README.md +++ b/akka-sample-cqrs-scala/README.md @@ -1,66 +1,2 @@ -This tutorial contains a sample illustrating an CQRS design with [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html), [Akka Cluster Singleton](https://doc.akka.io/docs/akka/2.6/typed/cluster-singleton.html), [Akka Persistence](https://doc.akka.io/docs/akka/2.6/typed/persistence.html) and [Akka Persistence Query](https://doc.akka.io/docs/akka/2.6/persistence-query.html). +# Projection latency testbed -## Overview - -This sample application implements a CQRS-ES design that will side-effect in the read model on selected events persisted to Cassandra by the write model. In this sample, the side-effect is logging a line. -A more practical example would be to send a message to a Kafka topic or update a relational database. - -## Write model - -The write model is a shopping cart. - -The implementation is based on a sharded actor: each `ShoppingCart` is an [Akka Cluster Sharding](https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html) entity. The entity actor `ShoppingCart` is an [EventSourcedBehavior](https://doc.akka.io/docs/akka/2.6/typed/persistence.html). - -Events from the shopping carts are tagged and consumed by the read model. - -## Read model - -The read model is implemented in such a way that 'load' is sharded over a number of processors. This number is `event-processor.parallelism`. -This is implemented using [Akka Projections](https://doc.akka.io/docs/akka-projection/current) which is then running on top of - [Sharded Daemon Process](https://doc.akka.io/docs/akka/current/typed/cluster-sharded-daemon-process.html). - - -## Running the sample code - -1. Start a Cassandra server by running: - -``` -sbt "runMain sample.cqrs.Main cassandra" -``` - -2. Start a node that runs the write model: - -``` -sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2551" -``` - -3. Start a node that runs the read model: - -``` -sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2552" -``` - -4. More write or read nodes can be started started by defining roles and port: - -``` -sbt -Dakka.cluster.roles.0=write-model "runMain sample.cqrs.Main 2553" -sbt -Dakka.cluster.roles.0=read-model "runMain sample.cqrs.Main 2554" -``` - -Try it with curl: - -``` -# add item to cart -curl -X POST -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' http://127.0.0.1:8051/shopping/carts - -# get cart -curl http://127.0.0.1:8051/shopping/carts/cart1 - -# update quantity of item -curl -X PUT -H "Content-Type: application/json" -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' http://127.0.0.1:8051/shopping/carts - -# check out cart -curl -X POST -H "Content-Type: application/json" -d '{}' http://127.0.0.1:8051/shopping/carts/cart1/checkout -``` - -or same `curl` commands to port 8052. diff --git a/akka-sample-cqrs-scala/build.sbt b/akka-sample-cqrs-scala/build.sbt index 43cb70d..a67e25b 100644 --- a/akka-sample-cqrs-scala/build.sbt +++ b/akka-sample-cqrs-scala/build.sbt @@ -1,12 +1,12 @@ -val AkkaVersion = "2.6.8" +val AkkaVersion = "2.6.9" val AkkaPersistenceCassandraVersion = "1.0.1" val AkkaHttpVersion = "10.2.0" -val AkkaProjectionVersion = "0.3" +val AkkaProjectionVersion = "1.0.0" -lazy val `akka-sample-cqrs-scala` = project +lazy val `akka-projection-testing` = project .in(file(".")) .settings( - organization := "com.lightbend.akka.samples", + organization := "akka.projection.testing", version := "1.0", scalaVersion := "2.13.1", scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"), @@ -20,19 +20,23 @@ lazy val `akka-sample-cqrs-scala` = project "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % AkkaPersistenceCassandraVersion, "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion, "com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionVersion, + "com.lightbend.akka" %% "akka-projection-jdbc" % AkkaProjectionVersion, + "com.zaxxer" % "HikariCP" % "3.4.5", "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3", + "org.postgresql" % "postgresql" % "42.2.14", "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test, "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test, "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test, "com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test, + "org.scalatest" %% "scalatest" % "3.1.0" % Test, "commons-io" % "commons-io" % "2.4" % Test), fork in run := false, Global / cancelable := false, // ctrl-c - mainClass in (Compile, run) := Some("sample.cqrs.Main"), + mainClass in (Compile, run) := Some("akka.projection.testing.Main"), // disable parallel tests parallelExecution in Test := false, // show full stack traces and test case durations diff --git a/akka-sample-cqrs-scala/projection.sql b/akka-sample-cqrs-scala/projection.sql new file mode 100644 index 0000000..154a945 --- /dev/null +++ b/akka-sample-cqrs-scala/projection.sql @@ -0,0 +1,21 @@ + +create table if not exists "AKKA_PROJECTION_OFFSET_STORE" ( + "PROJECTION_NAME" VARCHAR(255) NOT NULL, + "PROJECTION_KEY" VARCHAR(255) NOT NULL, + "CURRENT_OFFSET" VARCHAR(255) NOT NULL, + "MANIFEST" VARCHAR(4) NOT NULL, + "MERGEABLE" BOOLEAN NOT NULL, + "LAST_UPDATED" BIGINT NOT NULL, + constraint "PK_PROJECTION_ID" primary key ("PROJECTION_NAME","PROJECTION_KEY") +); + +create index if not exists "PROJECTION_NAME_INDEX" on "AKKA_PROJECTION_OFFSET_STORE" ("PROJECTION_NAME"); + +create table if not exists events ( + name varchar(256), + event varchar(256), + constraint pkey primary key (name, event) +); + + + diff --git a/akka-sample-cqrs-scala/src/main/resources/application.conf b/akka-sample-cqrs-scala/src/main/resources/application.conf index 9bcb675..da92bcf 100644 --- a/akka-sample-cqrs-scala/src/main/resources/application.conf +++ b/akka-sample-cqrs-scala/src/main/resources/application.conf @@ -1,11 +1,29 @@ akka { + + projection { + jdbc { + dialect = "postgres-dialect" + offset-store { + schema = "" + table = "AKKA_PROJECTION_OFFSET_STORE" + } + blocking-jdbc-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 10 + } + } + } + } + loglevel = DEBUG actor { provider = cluster serialization-bindings { - "sample.cqrs.CborSerializable" = jackson-cbor + "akka.projection.testing.CborSerializable" = jackson-cbor } } @@ -18,8 +36,8 @@ akka { cluster { seed-nodes = [ - "akka://[email protected]:2551", - "akka://[email protected]:2552" + "akka://[email protected]:2551", + "akka://[email protected]:2552" ] roles = ["write-model", "read-model"] @@ -37,13 +55,17 @@ akka { # Configuration for akka-persistence-cassandra akka.persistence.cassandra { + journal { + keyspace = "akka_testing" + } + events-by-tag { - bucket-size = "Day" + bucket-size = "Hour" # for reduced latency eventual-consistency-delay = 200ms flush-interval = 50ms pubsub-notification = on - first-time-bucket = "20200115T00:00" + first-time-bucket = "20201001T00:00" } query { @@ -61,14 +83,13 @@ datastax-java-driver { advanced.reconnect-on-init = on } -akka.projection.cassandra.offset-store.keyspace = "akka_cqrs_sample" - event-processor { - tag-prefix = "carts-slice" // even processor tag prefix - parallelism = 4 // number of event processors + parallelism = 4 +} + +test { + } -shopping.http.port = 0 -shopping.askTimeout = 5 s diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala similarity index 72% rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala index cbec29e..a3a147e 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/CborSerializable.scala +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/CborSerializable.scala @@ -1,4 +1,4 @@ -package sample.cqrs +package akka.projection.testing /** * Marker trait for serialization with Jackson CBOR diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala new file mode 100644 index 0000000..579d208 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ConfigurablePersistentActor.scala @@ -0,0 +1,47 @@ +package akka.projection.testing + +import akka.Done +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, ActorSystem, Behavior} +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey} +import akka.pattern.StatusReply +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior} + +object ConfigurablePersistentActor { + + val Key: EntityTypeKey[Command] = EntityTypeKey[Command]("configurable") + + def init(settings: EventProcessorSettings, system: ActorSystem[_]): ActorRef[ShardingEnvelope[Command]] = { + ClusterSharding(system).init(Entity(Key)(ctx => apply(settings, ctx.entityId)) + .withRole("write-model")) + } + + trait Command + + final case class PersistAndAck(toPersist: String, replyTo: ActorRef[StatusReply[Done]], testName: String) extends Command + + final case class Persist(toPersist: String, testName: String) extends Command + + final case class Event(testName: String, payload: String, timeCreated: Long = System.currentTimeMillis()) extends CborSerializable + + final case class State(eventsProcessed: Long) extends CborSerializable + + def apply(settings: EventProcessorSettings, persistenceId: String): Behavior[Command] = + Behaviors.setup { ctx => + EventSourcedBehavior[Command, Event, State]( + persistenceId = PersistenceId.ofUniqueId(persistenceId), + State(0), + (_, command) => command match { + case Persist(toPersist, testName) => + Effect.persist(Event(testName, toPersist)) + case PersistAndAck(toPersist, ack, testName) => + ctx.log.info("persisting event {}", command) + Effect.persist(Event(testName, toPersist)).thenRun(_ => ack ! StatusReply.ack()) + }, + (state, _) => state.copy(eventsProcessed = state.eventsProcessed + 1)).withTagger(event => + Set("tag-" + math.abs(event.hashCode() % settings.parallelism))) + } + +} diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala similarity index 63% rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala index 774a221..69430ec 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/EventProcessorSettings.scala +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/EventProcessorSettings.scala @@ -1,4 +1,4 @@ -package sample.cqrs +package akka.projection.testing import akka.actor.typed.ActorSystem import com.typesafe.config.Config @@ -10,10 +10,9 @@ object EventProcessorSettings { } def apply(config: Config): EventProcessorSettings = { - val tagPrefix: String = config.getString("tag-prefix") val parallelism: Int = config.getInt("parallelism") - EventProcessorSettings(tagPrefix, parallelism) + EventProcessorSettings(parallelism) } } -final case class EventProcessorSettings(tagPrefix: String, parallelism: Int) +final case class EventProcessorSettings(parallelism: Int) diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala new file mode 100644 index 0000000..c662600 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Guardian.scala @@ -0,0 +1,87 @@ +package akka.projection.testing + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, ActorSystem, Behavior} +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardedDaemonProcessSettings, ShardingEnvelope} +import akka.cluster.typed.Cluster +import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal +import akka.persistence.query.Offset +import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.jdbc.scaladsl.JdbcProjection +import akka.projection.scaladsl.ExactlyOnceProjection +import akka.projection.testing.LoadGeneration.{Result, RunTest} +import akka.projection.{ProjectionBehavior, ProjectionId} +import akka.util.Timeout +import com.zaxxer.hikari.{HikariConfig, HikariDataSource} + +import scala.concurrent.duration.DurationInt +import scala.util.{Failure, Success} + +object Guardian { + + def createProjectionFor( + settings: EventProcessorSettings, + index: Int, + factory: HikariFactory + )(implicit system: ActorSystem[_]): ExactlyOnceProjection[Offset, EventEnvelope[ConfigurablePersistentActor.Event]] = { + val tag = s"tag-$index" + val sourceProvider = EventSourcedProvider.eventsByTag[ConfigurablePersistentActor.Event]( + system = system, + readJournalPluginId = CassandraReadJournal.Identifier, + tag = tag) + JdbcProjection.exactlyOnce( + projectionId = ProjectionId("test-projection-id", tag), + sourceProvider, + () => factory.newSession(), + () => new ProjectionHandler(tag, system) + ) + } + + def apply(): Behavior[String] = { + Behaviors.setup[String] { context => + implicit val system: ActorSystem[_] = context.system + val config = new HikariConfig + config.setJdbcUrl("jdbc:postgresql://127.0.0.1:5432/") + config.setUsername("docker") + config.setPassword("docker") + config.setMaximumPoolSize(19) + config.setAutoCommit(false) + val dataSource = new HikariDataSource(config) + val settings = EventProcessorSettings(system) + val shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]] = ConfigurablePersistentActor.init(settings, system) + if (Cluster(system).selfMember.hasRole("read-model")) { + + val dbSessionFactory = new HikariFactory(dataSource) + + // we only want to run the daemon processes on the read-model nodes + val shardingSettings = ClusterShardingSettings(system) + val shardedDaemonProcessSettings = + ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model")) + + ShardedDaemonProcess(system).init( + name = "test-projection", + settings.parallelism, + n => ProjectionBehavior(createProjectionFor(settings, n, dbSessionFactory)), + shardedDaemonProcessSettings, + Some(ProjectionBehavior.Stop)) + } + + // TODO move to route + implicit val timeout: Timeout = 10.seconds + val loadGeneration: ActorRef[LoadGeneration.RunTest] = context.spawn(LoadGeneration(shardRegion, dataSource), "load-generation") + context.ask[RunTest, Result](loadGeneration, replyTo => LoadGeneration.RunTest(s"test-${System.currentTimeMillis()}", 2, 5, replyTo)) { + case Success(value) => + context.log.info("Test passed {}", value) + "success" + case Failure(t) => + context.log.error("Test failed",t ) + "failure" + } + + + Behaviors.empty + } + } +} diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala new file mode 100644 index 0000000..742a44b --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariFactory.scala @@ -0,0 +1,9 @@ +package akka.projection.testing + +import javax.sql.DataSource + +class HikariFactory(val dataSource: DataSource) { + def newSession(): HikariJdbcSession = { + new HikariJdbcSession(dataSource) + } +} diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala new file mode 100644 index 0000000..057b8d1 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HikariJdbcSession.scala @@ -0,0 +1,23 @@ +package akka.projection.testing + +import java.sql.Connection + +import akka.japi.function +import akka.projection.jdbc.JdbcSession +import javax.sql.DataSource + + + +class HikariJdbcSession(source: DataSource) extends JdbcSession { + + private val connection = source.getConnection + + override def withConnection[Result](func: function.Function[Connection, Result]): Result = + func(connection) + + override def commit(): Unit = connection.commit() + + override def rollback(): Unit = connection.rollback() + + override def close(): Unit = connection.close() +} diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala similarity index 59% rename from akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala rename to akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala index 4a9c313..2444cee 100644 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartServer.scala +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/HttpServer.scala @@ -1,18 +1,13 @@ -package sample.cqrs +package akka.projection.testing -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success - -import akka.actor.CoordinatedShutdown import akka.actor.typed.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Route -import akka.Done -class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) { - private val shutdown = CoordinatedShutdown(system) +import scala.concurrent.duration._ +import scala.util.{Failure, Success} +class HttpServer(routes: Route, port: Int)(implicit system: ActorSystem[_]) { import system.executionContext def start(): Unit = { @@ -21,7 +16,7 @@ class ShoppingCartServer(routes: Route, port: Int)(implicit system: ActorSystem[ .onComplete { case Success(binding) => val address = binding.localAddress - system.log.info("Shopping online at http://{}:{}/", address.getHostString, address.getPort) + system.log.info("Online at http://{}:{}/", address.getHostString, address.getPort) case Failure(ex) => system.log.error("Failed to bind HTTP endpoint, terminating system", ex) diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala new file mode 100644 index 0000000..35fb682 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/LoadGeneration.scala @@ -0,0 +1,85 @@ +package akka.projection.testing + +import akka.{Done, NotUsed} +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, Behavior, Terminated} +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.pattern.StatusReply +import akka.projection.testing.LoadGeneration.{Failed, Result, RunTest} +import akka.projection.testing.LoadTest.Start +import akka.stream.scaladsl.Source +import akka.util.Timeout +import javax.sql.DataSource + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.DurationInt +import scala.util.{Failure, Success} + +object LoadGeneration { + + case class RunTest(name: String, actors: Int, eventsPerActor: Int, reply: ActorRef[Result]) + + sealed trait Result + + case class Pass() extends Result + + case class Failed(t: Option[Throwable], expected: Int, got: Int) extends Result + + def apply(shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[RunTest] = Behaviors.setup { ctx => + Behaviors.receiveMessage[RunTest] { + case rt@RunTest(name, actors, eventsPerActor, reply) => + ctx.spawn(LoadTest(name, shardRegion, source), s"test-$name") ! Start(rt) + Behaviors.same + } + } + +} + +object LoadTest { + + sealed trait Command + + case class Start(test: RunTest) extends Command + + private case class StartValidation() extends Command + + private case class LoadGenerationFailed(t: Throwable) extends Command + + def apply(testName: String, shardRegion: ActorRef[ShardingEnvelope[ConfigurablePersistentActor.Command]], source: DataSource): Behavior[Command] = Behaviors.setup { ctx => + import akka.actor.typed.scaladsl.AskPattern._ + implicit val timeout: Timeout = 30.seconds + implicit val system = ctx.system + implicit val ec: ExecutionContext = system.executionContext + Behaviors.receiveMessage[Command] { + case Start(RunTest(_, actors, eventsPerActor, replyTo)) => + ctx.log.info("Starting load generation") + val expected = actors * eventsPerActor + val testRun: Source[StatusReply[Done], NotUsed] = Source(1 to actors) + .flatMapConcat(id => + Source(1 to eventsPerActor) + .mapAsync(1)(message => shardRegion.ask[StatusReply[Done]] { replyTo => + ShardingEnvelope(s"$id", ConfigurablePersistentActor.PersistAndAck(s"actor-$id-message-$message", replyTo, testName)) + })) + ctx.pipeToSelf(testRun.run()) { + case Success(_) => StartValidation() + case Failure(t) => LoadGenerationFailed(t) + } + Behaviors.receiveMessage[Command] { + case StartValidation() => + ctx.log.info("Starting validation") + val validation = ctx.spawn(TestValidation(replyTo, testName, expected, source: DataSource), s"TestValidation=$testName") + ctx.watch(validation) + Behaviors.same + case LoadGenerationFailed(t) => + ctx.log.error("Load generation failed", t) + replyTo ! Failed(Some(t), -1, -1) + Behaviors.stopped + }.receiveSignal { + case (ctx, Terminated(_)) => + ctx.log.info("Validation finished, terminating") + Behaviors.stopped + } + } + } + +} diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala new file mode 100644 index 0000000..09690e3 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/Main.scala @@ -0,0 +1,33 @@ +package akka.projection.testing + +import akka.actor.typed.ActorSystem +import com.typesafe.config.{Config, ConfigFactory} + +object Main { + + def main(args: Array[String]): Unit = { + args.headOption match { + + case Some(portString) if portString.matches("""\d+""") => + val port = portString.toInt + val httpPort = ("80" + portString.takeRight(2)).toInt + startNode(port, httpPort) + + case None => + throw new IllegalArgumentException("port number required argument") + } + } + + def startNode(port: Int, httpPort: Int): Unit = { + ActorSystem[String](Guardian(), "test", config(port, httpPort)) + + } + + def config(port: Int, httpPort: Int): Config = + ConfigFactory.parseString( + s""" + akka.remote.artery.canonical.port = $port + test.http.port = $httpPort + """).withFallback(ConfigFactory.load()) + +} diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala new file mode 100644 index 0000000..bf8e7a0 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/ProjectionHandler.scala @@ -0,0 +1,19 @@ +package akka.projection.testing + +import akka.actor.typed.ActorSystem +import akka.projection.eventsourced.EventEnvelope +import akka.projection.jdbc.scaladsl.JdbcHandler +import org.slf4j.{Logger, LoggerFactory} + +class ProjectionHandler(tag: String, system: ActorSystem[_]) + extends JdbcHandler[EventEnvelope[ConfigurablePersistentActor.Event], HikariJdbcSession] { + private val log: Logger = LoggerFactory.getLogger(getClass) + + override def process(session: HikariJdbcSession, envelope: EventEnvelope[ConfigurablePersistentActor.Event]): Unit = { + log.info("Event {} for tag {}", envelope.event.payload, tag) + session.withConnection(connection => + connection.createStatement() + .execute(s"insert into events(name, event) values ('${envelope.event.testName}','${envelope.event.payload}')") + ) + } +} diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala new file mode 100644 index 0000000..a1b40ce --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestRoutes.scala @@ -0,0 +1,30 @@ +package akka.projection.testing + +import akka.actor.typed.ActorSystem +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.Route +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import spray.json.DefaultJsonProtocol._ +import spray.json.RootJsonFormat + +object TestRoutes { + case class RunTest(name: String, nrActors: Long, messagesPerActor: Long) + case class TestResult(pass: Boolean, expected: Long, got: Long) + + implicit val runTestFormat: RootJsonFormat[RunTest] = jsonFormat3(RunTest) + implicit val testResultFormat: RootJsonFormat[TestResult] = jsonFormat3(TestResult) +} + +//class TestRoutes()(implicit val system: ActorSystem[_]) { +// import TestRoutes._ +// val route: Route = path("test") { +// post { +// entity(as[RunTest]) { runTest => +// +// +// } +// } +// } +// +//} diff --git a/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala new file mode 100644 index 0000000..0976a82 --- /dev/null +++ b/akka-sample-cqrs-scala/src/main/scala/akka/projection/testing/TestValidation.scala @@ -0,0 +1,45 @@ +package akka.projection.testing + +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.scaladsl.Behaviors +import akka.projection.testing.LoadGeneration.{Pass, Result} +import javax.sql.DataSource + +object TestValidation { + // FIXME blocking, dispatcher + // FIXME timeout + def apply(replyTo: ActorRef[Result], testName: String, expectedNrEvents: Long, source: DataSource): Behavior[String] = { + import scala.concurrent.duration._ + Behaviors.setup { ctx => + def validate(): Boolean = { + val connection = source.getConnection + try { + val resultSet = connection.createStatement().executeQuery(s"select count(*) from events where name = '$testName'") + if (resultSet.next()) { + val count = resultSet.getInt("count") + ctx.log.info("Expected {} got {}!", expectedNrEvents, count) + expectedNrEvents == count + } else { + throw new RuntimeException("Expected single row") + } + } finally { + connection.close() + } + } + + Behaviors.withTimers { timers => + timers.startTimerAtFixedRate("test", 2.seconds) + Behaviors.receiveMessage { + case "test" => + if (validate()) { + ctx.log.info("Validated. Stopping") + replyTo ! Pass() + Behaviors.stopped + } else { + Behaviors.same + } + } + } + } + } +} diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala deleted file mode 100644 index 795d407..0000000 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/Main.scala +++ /dev/null @@ -1,153 +0,0 @@ -package sample.cqrs - -import java.io.File -import java.util.concurrent.CountDownLatch - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.typed.ActorSystem -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.Behaviors -import akka.cluster.sharding.typed.{ ClusterShardingSettings, ShardedDaemonProcessSettings } -import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess -import akka.cluster.typed.Cluster -import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal -import akka.persistence.cassandra.testkit.CassandraLauncher -import akka.persistence.query.Offset -import akka.projection.{ ProjectionBehavior, ProjectionId } -import akka.projection.scaladsl.AtLeastOnceProjection -import akka.projection.cassandra.scaladsl.CassandraProjection -import akka.projection.eventsourced.EventEnvelope -import akka.projection.eventsourced.scaladsl.EventSourcedProvider -import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory - -object Main { - - def main(args: Array[String]): Unit = { - args.headOption match { - - case Some(portString) if portString.matches("""\d+""") => - val port = portString.toInt - val httpPort = ("80" + portString.takeRight(2)).toInt - startNode(port, httpPort) - - case Some("cassandra") => - startCassandraDatabase() - println("Started Cassandra, press Ctrl + C to kill") - new CountDownLatch(1).await() - - case None => - throw new IllegalArgumentException("port number, or cassandra required argument") - } - } - - def startNode(port: Int, httpPort: Int): Unit = { - val system = - ActorSystem[Nothing](Guardian(), "Shopping", config(port, httpPort)) - - if (Cluster(system).selfMember.hasRole("read-model")) - createTables(system) - } - - def config(port: Int, httpPort: Int): Config = - ConfigFactory.parseString(s""" - akka.remote.artery.canonical.port = $port - shopping.http.port = $httpPort - """).withFallback(ConfigFactory.load()) - - /** - * To make the sample easier to run we kickstart a Cassandra instance to - * act as the journal. Cassandra is a great choice of backend for Akka Persistence but - * in a real application a pre-existing Cassandra cluster should be used. - */ - def startCassandraDatabase(): Unit = { - val databaseDirectory = new File("target/cassandra-db") - CassandraLauncher.start(databaseDirectory, CassandraLauncher.DefaultTestConfigResource, clean = false, port = 9042) - } - - def createTables(system: ActorSystem[_]): Unit = { - val session = - CassandraSessionRegistry(system).sessionFor("alpakka.cassandra") - - // TODO use real replication strategy in real application - val keyspaceStmt = - """ - CREATE KEYSPACE IF NOT EXISTS akka_cqrs_sample - WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } - """ - - val offsetTableStmt = - """ - CREATE TABLE IF NOT EXISTS akka_cqrs_sample.offset_store ( - projection_name text, - partition int, - projection_key text, - offset text, - manifest text, - last_updated timestamp, - PRIMARY KEY ((projection_name, partition), projection_key) - ) - """ - - // ok to block here, main thread - Await.ready(session.executeDDL(keyspaceStmt), 30.seconds) - system.log.info("Created akka_cqrs_sample keyspace") - Await.ready(session.executeDDL(offsetTableStmt), 30.seconds) - system.log.info("Created akka_cqrs_sample.offset_store table") - - } - -} - -object Guardian { - - def createProjectionFor( - system: ActorSystem[_], - settings: EventProcessorSettings, - index: Int): AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = { - val tag = s"${settings.tagPrefix}-$index" - val sourceProvider = EventSourcedProvider.eventsByTag[ShoppingCart.Event]( - system = system, - readJournalPluginId = CassandraReadJournal.Identifier, - tag = tag) - CassandraProjection.atLeastOnce( - projectionId = ProjectionId("shopping-carts", tag), - sourceProvider, - handler = () => new ShoppingCartProjectionHandler(tag, system)) - } - - def apply(): Behavior[Nothing] = { - Behaviors.setup[Nothing] { context => - val system = context.system - - val settings = EventProcessorSettings(system) - - val httpPort = context.system.settings.config.getInt("shopping.http.port") - - ShoppingCart.init(system, settings) - - if (Cluster(system).selfMember.hasRole("read-model")) { - - // we only want to run the daemon processes on the read-model nodes - val shardingSettings = ClusterShardingSettings(system) - val shardedDaemonProcessSettings = - ShardedDaemonProcessSettings(system).withShardingSettings(shardingSettings.withRole("read-model")) - - ShardedDaemonProcess(system).init( - name = "ShoppingCartProjection", - settings.parallelism, - n => ProjectionBehavior(createProjectionFor(system, settings, n)), - shardedDaemonProcessSettings, - Some(ProjectionBehavior.Stop)) - } - - val routes = new ShoppingCartRoutes()(context.system) - new ShoppingCartServer(routes.shopping, httpPort)(context.system).start() - - Behaviors.empty - } - } -} diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala deleted file mode 100644 index c800f0b..0000000 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCart.scala +++ /dev/null @@ -1,215 +0,0 @@ -package sample.cqrs - -import java.time.Instant - -import scala.concurrent.duration._ -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem -import akka.actor.typed.Behavior -import akka.actor.typed.SupervisorStrategy -import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.cluster.sharding.typed.scaladsl.Entity -import akka.cluster.sharding.typed.scaladsl.EntityTypeKey -import akka.pattern.StatusReply -import akka.persistence.typed.PersistenceId -import akka.persistence.typed.scaladsl.RetentionCriteria -import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior -import akka.persistence.typed.scaladsl.ReplyEffect - -/** - * This is an event sourced actor. It has a state, [[ShoppingCart.State]], which - * stores the current shopping cart items and whether it's checked out. - * - * Event sourced actors are interacted with by sending them commands, - * see classes implementing [[ShoppingCart.Command]]. - * - * Commands get translated to events, see classes implementing [[ShoppingCart.Event]]. - * It's the events that get persisted by the entity. Each event will have an event handler - * registered for it, and an event handler updates the current state based on the event. - * This will be done when the event is first created, and it will also be done when the entity is - * loaded from the database - each event will be replayed to recreate the state - * of the entity. - */ -object ShoppingCart { - - /** - * The current state held by the persistent entity. - */ - final case class State(items: Map[String, Int], checkoutDate: Option[Instant]) extends CborSerializable { - - def isCheckedOut: Boolean = - checkoutDate.isDefined - - def hasItem(itemId: String): Boolean = - items.contains(itemId) - - def isEmpty: Boolean = - items.isEmpty - - def updateItem(itemId: String, quantity: Int): State = { - quantity match { - case 0 => copy(items = items - itemId) - case _ => copy(items = items + (itemId -> quantity)) - } - } - - def removeItem(itemId: String): State = - copy(items = items - itemId) - - def checkout(now: Instant): State = - copy(checkoutDate = Some(now)) - - def toSummary: Summary = - Summary(items, isCheckedOut) - } - object State { - val empty = State(items = Map.empty, checkoutDate = None) - } - - /** - * This interface defines all the commands that the ShoppingCart persistent actor supports. - */ - sealed trait Command extends CborSerializable - - /** - * A command to add an item to the cart. - * - * It can reply with `StatusReply[Summary]`, which is sent back to the caller when - * all the events emitted by this command are successfully persisted. - */ - final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command - - /** - * A command to remove an item from the cart. - */ - final case class RemoveItem(itemId: String, replyTo: ActorRef[StatusReply[Summary]]) extends Command - - /** - * A command to adjust the quantity of an item in the cart. - */ - final case class AdjustItemQuantity(itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) - extends Command - - /** - * A command to checkout the shopping cart. - */ - final case class Checkout(replyTo: ActorRef[StatusReply[Summary]]) extends Command - - /** - * A command to get the current state of the shopping cart. - */ - final case class Get(replyTo: ActorRef[Summary]) extends Command - - /** - * Summary of the shopping cart state, used in reply messages. - */ - final case class Summary(items: Map[String, Int], checkedOut: Boolean) extends CborSerializable - - /** - * This interface defines all the events that the ShoppingCart supports. - */ - sealed trait Event extends CborSerializable { - def cartId: String - } - - final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event - - final case class ItemRemoved(cartId: String, itemId: String) extends Event - - final case class ItemQuantityAdjusted(cartId: String, itemId: String, newQuantity: Int) extends Event - - final case class CheckedOut(cartId: String, eventTime: Instant) extends Event - - val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart") - - def init(system: ActorSystem[_], eventProcessorSettings: EventProcessorSettings): Unit = { - ClusterSharding(system).init(Entity(EntityKey) { entityContext => - val n = math.abs(entityContext.entityId.hashCode % eventProcessorSettings.parallelism) - val eventProcessorTag = eventProcessorSettings.tagPrefix + "-" + n - ShoppingCart(entityContext.entityId, Set(eventProcessorTag)) - }.withRole("write-model")) - } - - def apply(cartId: String, eventProcessorTags: Set[String]): Behavior[Command] = { - EventSourcedBehavior - .withEnforcedReplies[Command, Event, State]( - PersistenceId(EntityKey.name, cartId), - State.empty, - (state, command) => - //The shopping cart behavior changes if it's checked out or not. - // The commands are handled differently for each case. - if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command) - else openShoppingCart(cartId, state, command), - (state, event) => handleEvent(state, event)) - .withTagger(_ => eventProcessorTags) - .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) - .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) - } - - private def openShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] = - command match { - case AddItem(itemId, quantity, replyTo) => - if (state.hasItem(itemId)) - Effect.reply(replyTo)(StatusReply.Error(s"Item '$itemId' was already added to this shopping cart")) - else if (quantity <= 0) - Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero")) - else - Effect - .persist(ItemAdded(cartId, itemId, quantity)) - .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) - - case RemoveItem(itemId, replyTo) => - if (state.hasItem(itemId)) - Effect - .persist(ItemRemoved(cartId, itemId)) - .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) - else - Effect.reply(replyTo)(StatusReply.Success(state.toSummary)) // removing an item is idempotent - - case AdjustItemQuantity(itemId, quantity, replyTo) => - if (quantity <= 0) - Effect.reply(replyTo)(StatusReply.Error("Quantity must be greater than zero")) - else if (state.hasItem(itemId)) - Effect - .persist(ItemQuantityAdjusted(cartId, itemId, quantity)) - .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) - else - Effect.reply(replyTo)( - StatusReply.Error(s"Cannot adjust quantity for item '$itemId'. Item not present on cart")) - - case Checkout(replyTo) => - if (state.isEmpty) - Effect.reply(replyTo)(StatusReply.Error("Cannot checkout an empty shopping cart")) - else - Effect - .persist(CheckedOut(cartId, Instant.now())) - .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) - - case Get(replyTo) => - Effect.reply(replyTo)(state.toSummary) - } - - private def checkedOutShoppingCart(cartId: String, state: State, command: Command): ReplyEffect[Event, State] = - command match { - case Get(replyTo) => - Effect.reply(replyTo)(state.toSummary) - case cmd: AddItem => - Effect.reply(cmd.replyTo)(StatusReply.Error("Can't add an item to an already checked out shopping cart")) - case cmd: RemoveItem => - Effect.reply(cmd.replyTo)(StatusReply.Error("Can't remove an item from an already checked out shopping cart")) - case cmd: AdjustItemQuantity => - Effect.reply(cmd.replyTo)(StatusReply.Error("Can't adjust item on an already checked out shopping cart")) - case cmd: Checkout => - Effect.reply(cmd.replyTo)(StatusReply.Error("Can't checkout already checked out shopping cart")) - } - - private def handleEvent(state: State, event: Event) = { - event match { - case ItemAdded(_, itemId, quantity) => state.updateItem(itemId, quantity) - case ItemRemoved(_, itemId) => state.removeItem(itemId) - case ItemQuantityAdjusted(_, itemId, quantity) => state.updateItem(itemId, quantity) - case CheckedOut(_, eventTime) => state.checkout(eventTime) - } - } -} diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala deleted file mode 100644 index cf965a8..0000000 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartProjectionHandler.scala +++ /dev/null @@ -1,27 +0,0 @@ -package sample.cqrs - -import akka.Done -import akka.actor.typed.ActorSystem -import akka.actor.typed.eventstream.EventStream -import akka.projection.eventsourced.EventEnvelope -import akka.projection.scaladsl.Handler -import org.slf4j.LoggerFactory - -import scala.concurrent.Future - -class ShoppingCartProjectionHandler(tag: String, system: ActorSystem[_]) - extends Handler[EventEnvelope[ShoppingCart.Event]] { - val log = LoggerFactory.getLogger(getClass) - - override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { - - log.info( - "EventProcessor({}) consumed {} from {} with seqNr {}", - tag, - envelope.event, - envelope.persistenceId, - envelope.sequenceNr) - system.eventStream ! EventStream.Publish(envelope.event) - Future.successful(Done) - } -} diff --git a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala b/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala deleted file mode 100644 index acd5b89..0000000 --- a/akka-sample-cqrs-scala/src/main/scala/sample/cqrs/ShoppingCartRoutes.scala +++ /dev/null @@ -1,110 +0,0 @@ -package sample.cqrs - -import scala.concurrent.Future -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem -import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.http.scaladsl.model.StatusCodes -import akka.http.scaladsl.server.Route -import akka.pattern.StatusReply -import akka.util.Timeout - -object ShoppingCartRoutes { - final case class AddItem(cartId: String, itemId: String, quantity: Int) - final case class UpdateItem(cartId: String, itemId: String, quantity: Int) -} - -class ShoppingCartRoutes()(implicit system: ActorSystem[_]) { - - implicit private val timeout: Timeout = - Timeout.create(system.settings.config.getDuration("shopping.askTimeout")) - private val sharding = ClusterSharding(system) - - import ShoppingCartRoutes._ - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ - import akka.http.scaladsl.server.Directives._ - import JsonFormats._ - - val shopping: Route = - pathPrefix("shopping") { - pathPrefix("carts") { - concat( - post { - entity(as[AddItem]) { - data => - val entityRef = - sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId) - val reply: Future[StatusReply[ShoppingCart.Summary]] = - entityRef.ask(ShoppingCart.AddItem(data.itemId, data.quantity, _)) - onSuccess(reply) { - case StatusReply.Success(summary: ShoppingCart.Summary) => - complete(StatusCodes.OK -> summary) - case StatusReply.Error(reason) => - complete(StatusCodes.BadRequest -> reason) - } - } - }, - put { - entity(as[UpdateItem]) { - data => - val entityRef = - sharding.entityRefFor(ShoppingCart.EntityKey, data.cartId) - - def command(replyTo: ActorRef[StatusReply[ShoppingCart.Summary]]) = - if (data.quantity == 0) - ShoppingCart.RemoveItem(data.itemId, replyTo) - else - ShoppingCart.AdjustItemQuantity(data.itemId, data.quantity, replyTo) - - val reply: Future[StatusReply[ShoppingCart.Summary]] = - entityRef.ask(command(_)) - onSuccess(reply) { - case StatusReply.Success(summary: ShoppingCart.Summary) => - complete(StatusCodes.OK -> summary) - case StatusReply.Error(reason) => - complete(StatusCodes.BadRequest -> reason) - } - } - }, - pathPrefix(Segment) { cartId => - concat(get { - val entityRef = - sharding.entityRefFor(ShoppingCart.EntityKey, cartId) - onSuccess(entityRef.ask(ShoppingCart.Get)) { summary => - if (summary.items.isEmpty) complete(StatusCodes.NotFound) - else complete(summary) - } - }, path("checkout") { - post { - val entityRef = - sharding.entityRefFor(ShoppingCart.EntityKey, cartId) - val reply: Future[StatusReply[ShoppingCart.Summary]] = - entityRef.ask(ShoppingCart.Checkout(_)) - onSuccess(reply) { - case StatusReply.Success(summary: ShoppingCart.Summary) => - complete(StatusCodes.OK -> summary) - case StatusReply.Error(reason) => - complete(StatusCodes.BadRequest -> reason) - } - } - }) - }) - } - } - -} - -object JsonFormats { - - import spray.json.RootJsonFormat - // import the default encoders for primitive types (Int, String, Lists etc) - import spray.json.DefaultJsonProtocol._ - - implicit val summaryFormat: RootJsonFormat[ShoppingCart.Summary] = - jsonFormat2(ShoppingCart.Summary) - implicit val addItemFormat: RootJsonFormat[ShoppingCartRoutes.AddItem] = - jsonFormat3(ShoppingCartRoutes.AddItem) - implicit val updateItemFormat: RootJsonFormat[ShoppingCartRoutes.UpdateItem] = - jsonFormat3(ShoppingCartRoutes.UpdateItem) - -} diff --git a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml b/akka-sample-cqrs-scala/src/test/resources/logback-test.xml deleted file mode 100644 index 179c899..0000000 --- a/akka-sample-cqrs-scala/src/test/resources/logback-test.xml +++ /dev/null @@ -1,17 +0,0 @@ -<configuration> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>[%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg%n</pattern> - </encoder> - </appender> - - <logger name="com.datastax.oss.driver" level="WARN"/> - <logger name="org.apache.cassandra" level="ERROR"/> - <logger name="com.codahale.metrics" level="INFO"/> - - <root level="INFO"> - <appender-ref ref="STDOUT"/> - </root> - -</configuration> diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala deleted file mode 100644 index 8f1930a..0000000 --- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/IntegrationSpec.scala +++ /dev/null @@ -1,198 +0,0 @@ -package sample.cqrs - -import java.io.File -import java.util.UUID - -import scala.concurrent.Await -import scala.concurrent.duration._ -import akka.actor.testkit.typed.scaladsl.ActorTestKit -import akka.actor.typed.eventstream.EventStream -import akka.cluster.MemberStatus -import akka.cluster.sharding.typed.scaladsl.ClusterSharding -import akka.cluster.typed.Cluster -import akka.cluster.typed.Join -import akka.pattern.StatusReply -import akka.persistence.cassandra.testkit.CassandraLauncher -import akka.persistence.testkit.scaladsl.PersistenceInit -import akka.persistence.typed.PersistenceId -import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfterAll -import org.scalatest.TestSuite -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers -import org.scalatest.time.Span -import org.scalatest.wordspec.AnyWordSpecLike - -object IntegrationSpec { - val config: Config = ConfigFactory.parseString(s""" - akka.cluster { - seed-nodes = [] - } - - akka.persistence.cassandra { - events-by-tag { - eventual-consistency-delay = 200ms - } - - query { - refresh-interval = 500 ms - } - - journal.keyspace-autocreate = on - journal.tables-autocreate = on - snapshot.keyspace-autocreate = on - snapshot.tables-autocreate = on - } - datastax-java-driver { - basic.contact-points = ["127.0.0.1:19042"] - basic.load-balancing-policy.local-datacenter = "datacenter1" - } - - event-processor { - keep-alive-interval = 1 seconds - } - akka.loglevel = DEBUG - akka.actor.testkit.typed.single-expect-default = 5s - # For LoggingTestKit - akka.actor.testkit.typed.filter-leeway = 5s - akka.actor.testkit.typed.throw-on-shutdown-timeout = off - """).withFallback(ConfigFactory.load()) -} - -class IntegrationSpec - extends TestSuite - with Matchers - with BeforeAndAfterAll - with AnyWordSpecLike - with ScalaFutures - with Eventually { - - implicit private val patience: PatienceConfig = - PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) - - private val databaseDirectory = new File("target/cassandra-IntegrationSpec") - - private def roleConfig(role: String): Config = - ConfigFactory.parseString(s"akka.cluster.roles = [$role]") - - // one TestKit (ActorSystem) per cluster node - private val testKit1 = ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config)) - private val testKit2 = - ActorTestKit("IntegrationSpec", roleConfig("write-model").withFallback(IntegrationSpec.config)) - private val testKit3 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config)) - private val testKit4 = ActorTestKit("IntegrationSpec", roleConfig("read-model").withFallback(IntegrationSpec.config)) - - private val systems3 = List(testKit1.system, testKit2.system, testKit3.system) - - override protected def beforeAll(): Unit = { - CassandraLauncher.start( - databaseDirectory, - CassandraLauncher.DefaultTestConfigResource, - clean = true, - port = 19042, // default is 9042, but use different for test - CassandraLauncher.classpathForResources("logback-test.xml")) - - // avoid concurrent creation of keyspace and tables - initializePersistence() - Main.createTables(testKit1.system) - - super.beforeAll() - } - - private def initializePersistence(): Unit = { - val timeout = 10.seconds - val done = PersistenceInit.initializeDefaultPlugins(testKit1.system, timeout) - Await.result(done, timeout) - } - - override protected def afterAll(): Unit = { - super.afterAll() - - testKit4.shutdownTestKit() - testKit3.shutdownTestKit() - testKit2.shutdownTestKit() - testKit1.shutdownTestKit() - - CassandraLauncher.stop() - FileUtils.deleteDirectory(databaseDirectory) - } - - "Shopping Cart application" should { - "init and join Cluster" in { - testKit1.spawn[Nothing](Guardian(), "guardian") - testKit2.spawn[Nothing](Guardian(), "guardian") - testKit3.spawn[Nothing](Guardian(), "guardian") - // node4 is initialized and joining later - - systems3.foreach { sys => - Cluster(sys).manager ! Join(Cluster(testKit1.system).selfMember.address) - } - - // let the nodes join and become Up - eventually(PatienceConfiguration.Timeout(10.seconds)) { - systems3.foreach { sys => - Cluster(sys).selfMember.status should ===(MemberStatus.Up) - } - } - } - - "update and consume from different nodes" in { - val cart1 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-1") - val probe1 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]] - - val cart2 = ClusterSharding(testKit2.system).entityRefFor(ShoppingCart.EntityKey, "cart-2") - val probe2 = testKit2.createTestProbe[StatusReply[ShoppingCart.Summary]] - - val eventProbe3 = testKit3.createTestProbe[ShoppingCart.Event]() - testKit3.system.eventStream ! EventStream.Subscribe(eventProbe3.ref) - - // update from node1, consume event from node3 - cart1 ! ShoppingCart.AddItem("foo", 42, probe1.ref) - probe1.receiveMessage().isSuccess should ===(true) - eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-1", "foo", 42)) - - // update from node2, consume event from node3 - cart2 ! ShoppingCart.AddItem("bar", 17, probe2.ref) - probe2.receiveMessage().isSuccess should ===(true) - cart2 ! ShoppingCart.AdjustItemQuantity("bar", 18, probe2.ref) - probe2.receiveMessage().isSuccess should ===(true) - eventProbe3.expectMessage(ShoppingCart.ItemAdded("cart-2", "bar", 17)) - eventProbe3.expectMessage(ShoppingCart.ItemQuantityAdjusted("cart-2", "bar", 18)) - } - - "continue even processing from offset" in { - // give it time to write the offset before shutting down - Thread.sleep(1000) - testKit3.shutdownTestKit() - - val eventProbe4 = testKit4.createTestProbe[ShoppingCart.Event]() - testKit4.system.eventStream ! EventStream.Subscribe(eventProbe4.ref) - - testKit4.spawn[Nothing](Guardian(), "guardian") - - Cluster(testKit4.system).manager ! Join(Cluster(testKit1.system).selfMember.address) - - // let the node join and become Up - eventually(PatienceConfiguration.Timeout(10.seconds)) { - Cluster(testKit4.system).selfMember.status should ===(MemberStatus.Up) - } - - val cart3 = ClusterSharding(testKit1.system).entityRefFor(ShoppingCart.EntityKey, "cart-3") - val probe3 = testKit1.createTestProbe[StatusReply[ShoppingCart.Summary]] - - // update from node1, consume event from node4 - cart3 ! ShoppingCart.AddItem("abc", 43, probe3.ref) - probe3.receiveMessage().isSuccess should ===(true) - // note that node4 is new, but continues reading from previous offset, i.e. not receiving events - // that have already been consumed - eventProbe4.expectMessage(ShoppingCart.ItemAdded("cart-3", "abc", 43)) - } - - } -} diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala deleted file mode 100644 index 5932ea9..0000000 --- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ProjectionSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -package sample.cqrs - -import java.io.File - -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.typed.eventstream.EventStream -import akka.pattern.StatusReply -import akka.persistence.cassandra.testkit.CassandraLauncher -import akka.projection.testkit.scaladsl.ProjectionTestKit -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfterAll -import org.scalatest.wordspec.AnyWordSpecLike - -object ProjectionSpec { - def config = - ConfigFactory.parseString(""" - akka.actor.provider=local - akka.persistence.cassandra { - events-by-tag { - eventual-consistency-delay = 200ms - } - - query { - refresh-interval = 500 ms - } - - journal.keyspace-autocreate = on - journal.tables-autocreate = on - snapshot.keyspace-autocreate = on - snapshot.tables-autocreate = on - } - datastax-java-driver { - basic.contact-points = ["127.0.0.1:19042"] - basic.load-balancing-policy.local-datacenter = "datacenter1" - } - """).withFallback(ConfigFactory.load()) // re-use application.conf other settings -} - -class ProjectionSpec - extends ScalaTestWithActorTestKit(ProjectionSpec.config) - with AnyWordSpecLike - with BeforeAndAfterAll { - val projectionTestKit = ProjectionTestKit(testKit) - val settings = EventProcessorSettings(system) - - val databaseDirectory = new File("target/cassandra-ProjectionSpec") - - override protected def beforeAll(): Unit = { - CassandraLauncher.start( - databaseDirectory, - CassandraLauncher.DefaultTestConfigResource, - clean = true, - port = 19042, // default is 9042, but use different for test - CassandraLauncher.classpathForResources("logback-test.xml")) - - Main.createTables(system) - - super.beforeAll() - } - - override protected def afterAll(): Unit = { - super.afterAll() - CassandraLauncher.stop() - FileUtils.deleteDirectory(databaseDirectory) - } - - "The events from the Shopping Cart" should { - - "be published to the system event stream by the projection" in { - val cartProbe = createTestProbe[Any]() - val cart = spawn(ShoppingCart("cart-1", Set(s"${settings.tagPrefix}-0"))) - cart ! ShoppingCart.AddItem("25", 12, cartProbe.ref) - cartProbe.expectMessageType[StatusReply[ShoppingCart.Summary]].isSuccess should ===(true) - - val eventProbe = createTestProbe[ShoppingCart.Event]() - system.eventStream ! EventStream.Subscribe(eventProbe.ref) - projectionTestKit.run(Guardian.createProjectionFor(system, settings, 0)) { - val added = eventProbe.expectMessageType[ShoppingCart.ItemAdded] - added.cartId should ===("cart-1") - added.itemId should ===("25") - added.quantity should ===(12) - } - } - } - -} diff --git a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala b/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala deleted file mode 100644 index 898f923..0000000 --- a/akka-sample-cqrs-scala/src/test/scala/sample/cqrs/ShoppingCartSpec.scala +++ /dev/null @@ -1,86 +0,0 @@ -package sample.cqrs - -import java.util.UUID - -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.pattern.StatusReply -import org.scalatest.wordspec.AnyWordSpecLike - -class ShoppingCartSpec extends ScalaTestWithActorTestKit(s""" - akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" - """) with AnyWordSpecLike { - - private var counter = 0 - def newCartId(): String = { - counter += 1 - s"cart-$counter" - } - - "The Shopping Cart" should { - - "add item" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))) - } - - "reject already added item" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.receiveMessage().isSuccess should ===(true) - cart ! ShoppingCart.AddItem("foo", 13, probe.ref) - probe.receiveMessage().isError should ===(true) - } - - "remove item" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.receiveMessage().isSuccess should ===(true) - cart ! ShoppingCart.RemoveItem("foo", probe.ref) - probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map.empty, checkedOut = false))) - } - - "adjust quantity" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.receiveMessage().isSuccess should ===(true) - cart ! ShoppingCart.AdjustItemQuantity("foo", 43, probe.ref) - probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 43), checkedOut = false))) - } - - "checkout" in { - val cart = testKit.spawn(ShoppingCart(newCartId(), Set.empty)) - val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.receiveMessage().isSuccess should ===(true) - cart ! ShoppingCart.Checkout(probe.ref) - probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true))) - - cart ! ShoppingCart.AddItem("bar", 13, probe.ref) - probe.receiveMessage().isError should ===(true) - } - - "keep its state" in { - val cartId = newCartId() - val cart = testKit.spawn(ShoppingCart(cartId, Set.empty)) - val probe = testKit.createTestProbe[StatusReply[ShoppingCart.Summary]] - cart ! ShoppingCart.AddItem("foo", 42, probe.ref) - probe.expectMessage(StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))) - - testKit.stop(cart) - - // start again with same cartId - val restartedCart = testKit.spawn(ShoppingCart(cartId, Set.empty)) - val stateProbe = testKit.createTestProbe[ShoppingCart.Summary] - restartedCart ! ShoppingCart.Get(stateProbe.ref) - stateProbe.expectMessage(ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)) - } - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
