This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch akka-sample-persistence-multi-dc-scala in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 24003fafca2e062b2adfb23aee6316476bf42d0e Author: Arnout Engelen <[email protected]> AuthorDate: Wed Oct 25 17:08:15 2017 +0200 Use persistence-multi-dc-testkit --- akka-sample-persistence-multi-dc-scala/build.sbt | 10 +- .../src/test/resources/logback-test.xml | 18 --- .../persistence/multidc/testkit/BaseSpec.scala | 131 --------------------- .../multidc/testkit/CassandraLifecycle.scala | 107 ----------------- ...InterruptableCassandraReadJournalProvider.scala | 108 ----------------- 5 files changed, 4 insertions(+), 370 deletions(-) diff --git a/akka-sample-persistence-multi-dc-scala/build.sbt b/akka-sample-persistence-multi-dc-scala/build.sbt index 8a1486b..27fa2b0 100644 --- a/akka-sample-persistence-multi-dc-scala/build.sbt +++ b/akka-sample-persistence-multi-dc-scala/build.sbt @@ -8,13 +8,11 @@ resolvers += "com-mvn" at "https://repo.lightbend.com/commercial-releases/" resolvers += Resolver.url("com-ivy", url("https://repo.lightbend.com/commercial-releases/"))(Resolver.ivyStylePatterns) +val persistenceMultiDcVersion = "1.1-M4+3-667a6ef6" + libraryDependencies ++= Seq( - "com.lightbend.akka" %% "akka-persistence-multi-dc" % "1.1-M4", - // TODO replace with akka-persistence-multi-dc-testkit and move test infra there - "com.typesafe.akka" %% "akka-testkit" % "2.5.6" % "test", - // TODO make dependency of akka-persistence-multi-dc-testkit - "com.typesafe.akka" %% "akka-stream-contrib" % "0.8" % "test", - "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.58" % "test", + "com.lightbend.akka" %% "akka-persistence-multi-dc" % persistenceMultiDcVersion, + "com.lightbend.akka" %% "akka-persistence-multi-dc-testkit" % persistenceMultiDcVersion % "test", "org.scalatest" %% "scalatest" % "3.0.1" % "test" ) diff --git a/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml b/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml deleted file mode 100644 index e75548e..0000000 --- a/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml +++ /dev/null @@ -1,18 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <target>System.out</target> - <encoder> - <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %m%n%xException</pattern> - </encoder> - </appender> - - <logger name="org.apache.cassandra" level="ERROR" /> - <logger name="com.datastax.driver.core" level="WARN" /> - - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - </root> - -</configuration> \ No newline at end of file diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala deleted file mode 100644 index 2f99118..0000000 --- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala +++ /dev/null @@ -1,131 +0,0 @@ -package akka.persistence.multidc.testkit - -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory - -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.cluster.Cluster -import akka.persistence.multidc.PersistenceMultiDcSettings -import akka.persistence.query.PersistenceQuery -import akka.testkit.ImplicitSender -import akka.testkit.TestKit -import akka.testkit.TestProbe - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.Matchers -import org.scalatest.WordSpecLike -import org.scalatest.BeforeAndAfter - -import akka.persistence.multidc.testkit._ -import akka.persistence.multidc.internal.CassandraReplicatedEventQuery -import akka.persistence.multidc.internal.ReplicatedEventEnvelope - -object BaseSpec { - val clusterConfig = ConfigFactory.parseString(""" - akka.actor.provider = "cluster" - akka.remote.netty.tcp.port = 0 - akka.remote.artery.canonical.port = 0 - akka.remote.artery.canonical.hostname = 127.0.0.1 - akka.cluster.jmx.multi-mbeans-in-same-jvm = on - - # inceasing probability due to issue https://github.com/akka/akka/issues/23803 - akka.cluster.multi-data-center.cross-data-center-gossip-probability = 0.5 - - # speed up joining and such - akka.cluster.gossip-interval = 500 ms - """) - - def createFirstSystem(name: String): ActorSystem = createFirstSystem(name, ConfigFactory.empty()) - def createFirstSystem(name: String, cfg: Config): ActorSystem = ActorSystem( - name, - cfg.withFallback(ConfigFactory.parseString(s""" - akka.loglevel = INFO - akka.cluster.multi-data-center.self-data-center = DC-A - akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ] - cassandra-journal-multi-dc.keyspace=$name - cassandra-snapshot-store.keyspace=${name}Snapshot - cassandra-query-journal-multi-dc.class = "${classOf[InterruptableCassandraReadJournalProvider].getName}" - """)).withFallback(BaseSpec.clusterConfig).withFallback(CassandraLifecycle.config)) - - def otherSystemSettings: Config = - ConfigFactory.parseString(""" - akka.cluster.multi-data-center.self-data-center = DC-B - akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ] - """) - - def createOtherSystem(firstSystem: ActorSystem) = - ActorSystem( - firstSystem.name, - ConfigFactory.parseString(""" - akka.cluster.multi-data-center.self-data-center = DC-B - akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ] - """).withFallback(firstSystem.settings.config)) - - def createThirdSystem(firstSystem: ActorSystem) = - ActorSystem( - firstSystem.name, - ConfigFactory.parseString(""" - akka.cluster.multi-data-center.self-data-center = DC-C - akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ] - """).withFallback(firstSystem.settings.config)) - -} - -// TODO move a lot of this to a trait in testkit? -abstract class BaseSpec(name: String, cfg: Config = ConfigFactory.empty) - extends TestKit(BaseSpec.createFirstSystem(name, cfg)) - with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfter with CassandraLifecycleScalatest { - import BaseSpec._ - - val otherSystem = createOtherSystem(system) - val thirdSystem = createThirdSystem(system) - - val settings = PersistenceMultiDcSettings(system) - val otherSettings = PersistenceMultiDcSettings(otherSystem) - val thirdSettings = PersistenceMultiDcSettings(thirdSystem) - - private def queries(sys: ActorSystem): InterruptableCassandraReplicatedEventQuery = - PersistenceQuery(sys).readJournalFor[InterruptableCassandraReplicatedEventQuery](CassandraReplicatedEventQuery.Identifier) - - def disableReplication(from: ActorSystem, to: ActorSystem) = - queries(to).disableReplication(Cluster(from).selfDataCenter) - - def enableReplication(from: ActorSystem, to: ActorSystem) = - queries(to).enableReplication(Cluster(from).selfDataCenter) - - private[multidc] def addErrorFilter(sys: ActorSystem, key: String)(f: ReplicatedEventEnvelope => Option[Throwable]): Unit = - queries(sys).addErrorFilter(key)(f) - - def removeErrorFilter(sys: ActorSystem, key: String): Unit = - queries(sys).removeErrorFilter(key) - - override def systemName: String = name - - after { - Seq(system, otherSystem, thirdSystem).foreach { sys => - queries(sys).enableAll() - } - } - - override protected def afterAll(): Unit = { - shutdown(otherSystem) - shutdown(thirdSystem) - shutdown() - super.afterAll() - } - - def stopA(ref: ActorRef): Unit = - stop(ref, system) - - def stopB(ref: ActorRef): Unit = - stop(ref, otherSystem) - - def stop(ref: ActorRef, sys: ActorSystem): Unit = { - val probe = TestProbe()(sys) - probe.watch(ref) - sys.stop(ref) - probe.expectTerminated(ref) - } - -} diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala deleted file mode 100644 index caac3a4..0000000 --- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> - */ -package akka.persistence.multidc.testkit - -import java.io.File -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration._ -import akka.actor.ActorSystem -import akka.actor.Props -import akka.persistence.PersistentActor -import akka.persistence.cassandra.testkit.CassandraLauncher -import akka.testkit.TestKitBase -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfterAll -import org.scalatest.Suite - -object CassandraLifecycle { - - val config = ConfigFactory.parseString(s""" - akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store" - cassandra-journal-multi-dc.port = ${CassandraLauncher.randomPort} - cassandra-snapshot-store.port = ${CassandraLauncher.randomPort} - cassandra-journal-multi-dc.circuit-breaker.call-timeout = 30s - akka.test.single-expect-default = 10s - """) - - def awaitPersistenceInit(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = { - val probe = TestProbe()(system) - val t0 = System.nanoTime() - var n = 0 - probe.within(45.seconds) { - probe.awaitAssert { - n += 1 - system.actorOf(Props(classOf[AwaitPersistenceInit], journalPluginId, snapshotPluginId), "persistenceInit" + n).tell("hello", probe.ref) - probe.expectMsg(5.seconds, "hello") - system.log.debug("awaitPersistenceInit took {} ms {}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0), system.name) - } - } - } - - class AwaitPersistenceInit( - override val journalPluginId: String, - override val snapshotPluginId: String) extends PersistentActor { - def persistenceId: String = "persistenceInit" - - def receiveRecover: Receive = { - case _ => - } - - def receiveCommand: Receive = { - case msg => - persist(msg) { _ => - sender() ! msg - context.stop(self) - } - } - } - - def startCassandra(systemName: String, cassandraConfigResource: String): Unit = - startCassandra(None, None, systemName, cassandraConfigResource) - - def startCassandra(host: Option[String], port: Option[Int], - systemName: String, - cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource): Unit = { - val cassandraDirectory = new File("target/" + systemName) - CassandraLauncher.start( - cassandraDirectory, - configResource = cassandraConfigResource, - clean = true, - port = port.getOrElse(0), - CassandraLauncher.classpathForResources("logback-test.xml"), - host) - } - - def stopCassandra(): Unit = { - CassandraLauncher.stop() - } - - def awaitPersistenceInit(system: ActorSystem): Unit = { - CassandraLifecycle.awaitPersistenceInit(system, journalPluginId = "cassandra-journal-multi-dc") - } - -} - -trait CassandraLifecycleScalatest extends BeforeAndAfterAll { this: TestKitBase with Suite => - - import CassandraLifecycle._ - - def systemName: String - - def cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource - - override protected def beforeAll(): Unit = { - startCassandra(None, None, systemName, cassandraConfigResource) - awaitPersistenceInit(system) - super.beforeAll() - } - - override protected def afterAll(): Unit = { - shutdown(system, verifySystemShutdown = true) - stopCassandra() - super.afterAll() - } -} diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala deleted file mode 100644 index 0e6f029..0000000 --- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> - */ -package akka.persistence.multidc.testkit - -import scala.annotation.tailrec -import scala.concurrent.Future - -import com.typesafe.config.Config - -import akka.NotUsed -import akka.actor.ExtendedActorSystem -import akka.actor.ExtendedActorSystem -import akka.annotation.InternalApi -import akka.event.Logging -import akka.persistence.query.ReadJournalProvider -import akka.persistence.cassandra.query.EventsByPersistenceIdStage -import akka.persistence.multidc.internal.CassandraReplicatedEventQuery -import akka.persistence.multidc.internal.ReplicatedEventEnvelope -import akka.stream.contrib.{ SwitchMode, Valve, ValveSwitch } -import akka.stream.scaladsl.{ Keep, Source } - -@InternalApi private[testkit] class InterruptableCassandraReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider { - - override val scaladslReadJournal: CassandraReplicatedEventQuery = - new InterruptableCassandraReplicatedEventQuery(system, config) - - override val javadslReadJournal: CassandraReplicatedEventQuery = - scaladslReadJournal - -} - -/** - * It is retrieved with: - * {{{ - * val queries = PersistenceQuery(system).readJournalFor[InterruptableCassandraReplicatedEventQuery](CassandraReplicatedEventQeury.Identifier) - * }}} - */ -@InternalApi private[akka] class InterruptableCassandraReplicatedEventQuery(system: ExtendedActorSystem, config: Config) - extends CassandraReplicatedEventQuery(system, config) { - - private val log = Logging(system, getClass) - private var sourcesByDc = Map.empty[String, List[ValveSwitch]] - private var pendingDisable = Set.empty[String] - private var errors = Map.empty[String, ReplicatedEventEnvelope => Option[Throwable]] - - override def replicatedEvents(persistenceId: String, fromDc: String, sequenceNr: Long): Source[ReplicatedEventEnvelope, Future[EventsByPersistenceIdStage.Control]] = { - super.replicatedEvents(persistenceId, fromDc, sequenceNr) - .viaMat(new Valve(SwitchMode.Open))(Keep.both) - .map { e => - errors.foreach { case (_, f) => f(e).foreach(throw _) } - e - } - .mapMaterializedValue { - case (control, fs) => - add(fromDc, fs) - control - } - } - - def disableReplication(dc: String) = synchronized { - sourcesByDc.get(dc) match { - case Some(values) => values.foreach(_.flip(SwitchMode.Close)) - case None => pendingDisable += dc // not started yet - } - } - - def enableReplication(dc: String) = synchronized { - sourcesByDc.get(dc) match { - case Some(values) => values.foreach(_.flip(SwitchMode.Open)) - case None => pendingDisable -= dc - } - - } - - def enableAll(): Unit = synchronized { - sourcesByDc.keys.foreach(enableReplication) - pendingDisable = Set.empty - } - - private def add(dc: String, fs: Future[ValveSwitch]): Unit = { - implicit val ec = system.dispatcher - fs.onSuccess { case switch => add(dc, switch) } - } - - private def add(dc: String, switch: ValveSwitch): Unit = synchronized { - val oldList = sourcesByDc.getOrElse(dc, Nil) - sourcesByDc = sourcesByDc.updated(dc, switch :: oldList) - if (pendingDisable(dc)) { - switch.flip(SwitchMode.Close) - pendingDisable -= dc - } - } - - def addErrorFilter(key: String)(f: ReplicatedEventEnvelope => Option[Throwable]): Unit = synchronized { - errors = errors.updated(key, f) - } - - def removeErrorFilter(key: String): Unit = synchronized { - errors = errors - key - } - - def removeAllErrorFilters(): Unit = synchronized { - errors = Map.empty - } - -} - --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
