This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new d89f0d5 update tests to use testcontainers (#404)
d89f0d5 is described below
commit d89f0d51dda68f4a619269e2574265fc5c47b645
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 09:47:58 2026 +0100
update tests to use testcontainers (#404)
* Replace CassandraLauncher with testcontainers in ReconnectSpec and
EventsByTagMultiJvmSpec
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-cassandra/sessions/4e9c6664-673c-4179-9825-f6e7a804d9fe
Co-authored-by: pjfanning <[email protected]>
* Address code review: add @volatile, add comment on host parameter
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-cassandra/sessions/4e9c6664-673c-4179-9825-f6e7a804d9fe
Co-authored-by: pjfanning <[email protected]>
* refactor
* remove cassandra-launcher
* Delete CassandraLauncherSpec.scala
* avoid deprecated classes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
build.sbt | 38 +-
.../src/main/resources/log4j.properties | 4 -
cassandra-bundle/src/main/resources/logback.xml | 17 -
cassandra-launcher/.gitignore | 1 -
.../cassandra/testkit/CassandraLauncher.scala | 428 ---------------------
.../cassandra/EventsByTagMultiJvmSpec.scala | 43 ++-
.../persistence/cassandra/ReconnectSpec.scala | 14 +-
.../cassandra/query/EventsByTagSpec.scala | 8 +-
.../cassandra/testkit/CassandraLauncherSpec.scala | 74 ----
project/Dependencies.scala | 5 +-
10 files changed, 40 insertions(+), 592 deletions(-)
diff --git a/build.sbt b/build.sbt
index 76fe5c1..bfeb188 100644
--- a/build.sbt
+++ b/build.sbt
@@ -23,7 +23,7 @@ lazy val root = project
.in(file("."))
.enablePlugins(Common, ScalaUnidocPlugin)
.disablePlugins(SitePlugin, MimaPlugin)
- .aggregate(core, cassandraLauncher)
+ .aggregate(core)
.settings(name := "pekko-persistence-cassandra-root", publish / skip := true)
lazy val dumpSchema = taskKey[Unit]("Dumps cassandra schema for docs")
@@ -32,7 +32,6 @@ dumpSchema := (core / Test / runMain).toTask("
org.apache.pekko.persistence.cass
lazy val core = project
.in(file("core"))
.enablePlugins(Common, AutomateHeaderPlugin, MimaPlugin, MultiJvmPlugin,
ReproducibleBuildsPlugin)
- .dependsOn(cassandraLauncher % Test)
.addPekkoModuleDependency("pekko-connectors-cassandra", "",
PekkoConnectorsDependency.default)
.addPekkoModuleDependency("pekko-persistence", "",
PekkoCoreDependency.default)
.addPekkoModuleDependency("pekko-persistence-query", "",
PekkoCoreDependency.default)
@@ -55,41 +54,6 @@ lazy val core = project
organization.value %% name.value % mimaCompareVersion))
.configs(MultiJvm)
-lazy val cassandraLauncher = project
- .in(file("cassandra-launcher"))
- .enablePlugins(Common, ReproducibleBuildsPlugin)
- .disablePlugins(MimaPlugin)
- .settings(
- name := "pekko-persistence-cassandra-launcher",
- Compile / unmanagedResources += (cassandraBundle / Compile /
packageBin).value)
-
-// This project doesn't get published directly, rather the assembled artifact
is included as part of cassandraLaunchers
-// resources
-lazy val cassandraBundle = project
- .in(file("cassandra-bundle"))
- .enablePlugins(Common, AutomateHeaderPlugin)
- .disablePlugins(MimaPlugin)
- .settings(
- name := "pekko-persistence-cassandra-bundle",
- crossPaths := false,
- autoScalaLibrary := false,
- libraryDependencies += ("org.apache.cassandra" % "cassandra-all" %
"3.11.3")
- .exclude("commons-logging", "commons-logging"),
- dependencyOverrides += "com.github.jbellis" % "jamm" % "0.3.3", // See
jamm comment in https://issues.apache.org/jira/browse/CASSANDRA-9608
- assembly / assemblyJarName := "cassandra-bundle.jar",
- Compile / packageBin := Def.taskDyn {
- val store = streams.value.cacheStoreFactory.make("shaded-output")
- val uberJarLocation = (assembly / assemblyOutputPath).value
- val tracker = Tracked.outputChanged(store) { (changed: Boolean, file:
File) =>
- if (changed) {
- Def.task {
- (Compile / assembly).value
- }
- } else Def.task { file }
- }
- tracker(() => uberJarLocation)
- }.value)
-
// Used for testing events by tag in various environments
lazy val endToEndExample = project
.in(file("example"))
diff --git a/cassandra-bundle/src/main/resources/log4j.properties
b/cassandra-bundle/src/main/resources/log4j.properties
deleted file mode 100644
index 1db758a..0000000
--- a/cassandra-bundle/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,4 +0,0 @@
-log4j.rootLogger=ERROR, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
-log4j.logger.org.apache.cassandra.service.CassandraDaemon=OFF
diff --git a/cassandra-bundle/src/main/resources/logback.xml
b/cassandra-bundle/src/main/resources/logback.xml
deleted file mode 100644
index 70c10dc..0000000
--- a/cassandra-bundle/src/main/resources/logback.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <target>System.out</target>
- <encoder>
- <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} -
%m%n%xException</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.cassandra" level="ERROR" />
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- </root>
-
-</configuration>
\ No newline at end of file
diff --git a/cassandra-launcher/.gitignore b/cassandra-launcher/.gitignore
deleted file mode 100644
index 4fae3a9..0000000
--- a/cassandra-launcher/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/.target/
diff --git
a/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
b/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
deleted file mode 100644
index bc61dd6..0000000
---
a/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.persistence.cassandra.testkit
-
-import java.io._
-import java.net.{ InetSocketAddress, Socket, URI }
-import java.nio.channels.ServerSocketChannel
-import java.nio.file.Files
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
-import scala.annotation.varargs
-import scala.collection.immutable
-import scala.concurrent.duration._
-import scala.util.control.NonFatal
-import scala.util.{ Failure, Try }
-
-/**
- * Starts Cassandra in current JVM. There can only be one Cassandra instance
per JVM,
- * but keyspaces can be used for isolation.
- */
-object CassandraLauncher {
-
- class CleanFailedException(message: String, cause: Throwable) extends
RuntimeException(message, cause)
-
- private val ForcedShutdownTimeout = 20.seconds
- // Used in fork mode to wait for Cassandra to start listening
- private val AwaitListenTimeout = 45.seconds
- private val AwaitListenPoll = 100.millis
-
- /**
- * Default config for testing "test-embedded-cassandra.yaml"
- */
- val DefaultTestConfigResource: String = "test-embedded-cassandra.yaml"
-
- /**
- * Main method to start Cassandra, see [[#start]].
- * Note that `cassandra-all` jar must be in classpath.
- *
- * `port can be defined with `-DCassandraLauncher.port=4000`,
- * default is the `randomPort`
- * `clean` can be defined with `-DCassandraLauncher.clean=true`,
- * default is `false`
- * `directory` can be defined with
`-DCassandraLauncher.directory=target/embedded-cassandra`,
- * default is `target/embedded-cassandra`
- * `configResource` yaml configuration loaded from classpath,
- * can be defined with
`-DCassandraLauncher.configResource=test-embedded-cassandra.yaml`,
- * default is defined in [[CassandraLauncher#DefaultTestConfigResource]],
- * i.e. `test-embedded-cassandra.yaml`
- */
- def main(args: Array[String]): Unit = {
- val port: Int =
- if (args.length > 0) args(0).toInt
- else Integer.getInteger("CassandraLauncher.port", 0)
- val clean =
- if (args.length > 1) args(1).toBoolean
- else java.lang.Boolean.getBoolean("CassandraLauncher.clean")
- val dir =
- if (args.length > 2) new File(args(2))
- else
- new File(System.getProperty("CassandraLauncher.directory",
"target/embedded-cassandra"))
- val configResource =
- if (args.length > 3) args(3)
- else
- System.getProperty("CassandraLauncher.configResource",
DefaultTestConfigResource)
- start(dir, configResource, clean, port)
- }
-
- private var cassandraDaemon: Option[Closeable] = None
-
- private val DEFAULT_HOST = "127.0.0.1"
-
- private val initialPortsValue = (0, 0)
- private val selectedPorts: AtomicReference[(Int, Int)] = new
AtomicReference(initialPortsValue)
-
- /**
- * The random free port that will be used if `port=0` is
- * specified in the `start` method.
- *
- * Calling `randomPort` before `start` is not recommended. It will fix the
value and won't necessarily
- * reflect the value that is effectively used by the launcher.
- */
- lazy val randomPort: Int = {
- selectedPorts.compareAndSet(initialPortsValue,
selectFreePorts(DEFAULT_HOST, 0))
- selectedPorts.get()._1
- }
-
- /**
- * Select two free ports.
- * Note that requestPort is always used even if user requested a fixed port.
We want to make sure the port is not in use
- * and won't conflict with the storagePort
- */
- private def selectFreePorts(host: String, requestedPort: Int): (Int, Int) = {
-
- val clientSocket = ServerSocketChannel.open().socket()
- val storageSocket = ServerSocketChannel.open().socket()
-
- try {
- clientSocket.bind(new InetSocketAddress(host, requestedPort))
- storageSocket.bind(new InetSocketAddress(host, 0))
-
- // return both ports
- (clientSocket.getLocalPort, storageSocket.getLocalPort)
-
- } finally {
- // close independently
- val t1 = Try(clientSocket.close())
- val t2 = Try(storageSocket.close())
-
- // if one the two failed, we should throw the exception
- (t1, t2) match {
- case (Failure(ex1), Failure(ex2)) =>
- throw new RuntimeException(
- s"Failed to close sockets: client '${ex1.getMessage}', storage
'${ex2.getMessage}'")
- case (Failure(ex1), _) => throw new RuntimeException(s"Failed to close
client-port socket: '${ex1.getMessage}'")
- case (_, Failure(ex2)) =>
- throw new RuntimeException(s"Failed to close storage-port socket:
'${ex2.getMessage}'")
- case (_, _) => // we are fine, all closed
- }
-
- }
-
- }
-
- /**
- * Use this to locate classpath elements from the current classpath to add
- * to the classpath of the launched Cassandra.
- *
- * This is particularly useful if you want a custom logging, you can use
- * this to ensure that the directory that your log file is in is on the
- * classpath of the forked Cassandra process, for example:
- *
- * ```
- * CassandraLauncher.start(
- * cassandraDirectory,
- * CassandraLauncher.DefaultTestConfigResource,
- * clean = true,
- * port = 0,
- * CassandraLauncher.classpathForResources("logback.xml")
- * )
- * ```
- *
- * Files ending with `assembly.jar` are not included in the result because an
- * assembly jar will likely contain incompatible classes that shouldn't be
on the
- * classpath of the Cassandra server, such as incompatible dependency of
Guava.
- * Assembly jars are used when running multi-node testing.
- */
- @varargs
- def classpathForResources(resources: String*): immutable.Seq[String] = {
- resources
- .map { resource =>
- this.getClass.getClassLoader.getResource(resource) match {
- case null =>
- sys.error("Resource not found: " + resource)
- case fileUrl if fileUrl.getProtocol == "file" =>
- new
File(URI.create(fileUrl.toString.stripSuffix(resource))).getCanonicalPath
- case jarUrl if jarUrl.getProtocol == "jar" =>
- new File(URI.create(jarUrl.getPath.takeWhile(_ !=
'!'))).getCanonicalPath
- case _ =>
- sys.error("Resource not supported: " + resource)
- }
- }
- .distinct
- .toList
- .filterNot(_.endsWith("assembly.jar")) // TODO required?
- }
-
- /**
- * Start Cassandra
- *
- * @param cassandraDirectory the data directory to use
- * @param configResource yaml configuration loaded from classpath,
- * default configuration for testing is defined in
[[CassandraLauncher#DefaultTestConfigResource]]
- * @param clean if `true` all files in the data directory will be deleted
- * before starting Cassandra
- * @param port the `native_transport_port` to use, if 0 a random
- * free port is used, which can be retrieved (before starting)
- * with [[CassandraLauncher.randomPort]].
- * @throws
org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher.CleanFailedException
if `clean`
- * is `true` and removal of the directory fails
- */
- def start(cassandraDirectory: File, configResource: String, clean: Boolean,
port: Int): Unit =
- start(cassandraDirectory, configResource, clean, port, Nil)
-
- /**
- * Start Cassandra
- *
- * @param cassandraDirectory the data directory to use
- * @param configResource yaml configuration loaded from classpath,
- * default configuration for testing is defined in
[[CassandraLauncher#DefaultTestConfigResource]]
- * @param clean if `true` all files in the data directory will be deleted
- * before starting Cassandra
- * @param port the `native_transport_port` to use, if 0 a random
- * free port is used, which can be retrieved (before starting)
- * with [[CassandraLauncher.randomPort]].
- * @param classpath Any additional jars/directories to add to the classpath.
Use
- * [[CassandraLauncher#classpathForResources]] to assist in
calculating this.
- * @throws
org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher.CleanFailedException
if `clean`
- * is `true` and removal of the directory fails
- */
- def start(
- cassandraDirectory: File,
- configResource: String,
- clean: Boolean,
- port: Int,
- classpath: immutable.Seq[String]): Unit =
- start(cassandraDirectory, configResource, clean, port, classpath, None)
-
- /**
- * Start Cassandra
- *
- * @param cassandraDirectory the data directory to use
- * @param configResource yaml configuration loaded from classpath,
- * default configuration for testing is defined in
[[CassandraLauncher#DefaultTestConfigResource]]
- * @param clean if `true` all files in the data directory will be deleted
- * before starting Cassandra
- * @param port the `native_transport_port` to use, if 0 a random
- * free port is used, which can be retrieved (before starting)
- * with [[CassandraLauncher.randomPort]].
- * @param classpath Any additional jars/directories to add to the classpath.
Use
- * [[CassandraLauncher#classpathForResources]] to assist in
calculating this.
- * @param host the host to bind the embeded Cassandra to. If None, then
127.0.0.1 is used.
- * @throws
org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher.CleanFailedException
if `clean`
- * is `true` and removal of the directory fails
- */
- def start(
- cassandraDirectory: File,
- configResource: String,
- clean: Boolean,
- port: Int,
- classpath: immutable.Seq[String],
- host: Option[String]): Unit = this.synchronized {
- if (cassandraDaemon.isEmpty) {
-
- prepareCassandraDirectory(cassandraDirectory, clean)
-
- val realHost = host.getOrElse(DEFAULT_HOST)
-
- // NOTE: read comments bellow to get the full picture
- if (port != 0) {
- // if user explicitly passes a port, we should use it and override
`selectedPorts`
- // selectedPorts may have been already set if user has previously
called `randomPort`
- // in such a case, randomPort will be fixed to a 'wrong' old number
- selectedPorts.set(selectFreePorts(realHost, port))
- } else {
- // if a random port is requested, we only override `selectedPorts` if
not yet calculated (eg: default (0,0)).
- // If user has previously called `randomPort`, we will already have a
value and we should keep using it.
- selectedPorts.compareAndSet(initialPortsValue,
selectFreePorts(realHost, port))
- }
-
- val (realPort, storagePort) = selectedPorts.get()
-
- println(
- s"Starting Cassandra on port client port: $realPort storage port
$storagePort host $realHost java version ${System
- .getProperty("java.runtime.version")}")
-
- // http://wiki.apache.org/cassandra/StorageConfiguration
- val conf = readResource(configResource)
- val amendedConf = conf
- .replace("$PORT", realPort.toString)
- .replace("$STORAGE_PORT", storagePort.toString)
- .replace("$DIR", cassandraDirectory.getAbsolutePath)
- .replace("$HOST", realHost)
- val configFile = new File(cassandraDirectory, configResource)
- writeToFile(configFile, amendedConf)
-
- // Extract the cassandra bundle to the directory
- val cassandraBundleFile =
- new File(cassandraDirectory, "cassandra-bundle.jar")
- if (!cassandraBundleFile.exists()) {
- val is =
-
this.getClass.getClassLoader.getResourceAsStream("cassandra-bundle.jar")
- try {
- Files.copy(is, cassandraBundleFile.toPath)
- } finally {
- if (is != null) is.close()
- }
- }
-
- startForked(configFile, cassandraBundleFile, classpath, realHost,
realPort)
- }
- }
-
- private def prepareCassandraDirectory(cassandraDirectory: File, clean:
Boolean): Unit = {
- if (clean) {
- try {
- deleteRecursive(cassandraDirectory)
- } catch {
- // deleteRecursive may throw AssertionError
- case e: AssertionError =>
- throw new CleanFailedException(e.getMessage, e)
- case NonFatal(e) => throw new CleanFailedException(e.getMessage, e)
- }
- }
-
- if (!cassandraDirectory.exists)
- require(cassandraDirectory.mkdirs(), s"Couldn't create Cassandra
directory [$cassandraDirectory]")
- }
-
- private def startForked(
- configFile: File,
- cassandraBundle: File,
- classpath: immutable.Seq[String],
- host: String,
- port: Int): Unit = {
- // Calculate classpath
- val / = File.separator
- val javaBin = s"${System.getProperty("java.home")}${/}bin${/}java"
- val className = "org.apache.cassandra.service.CassandraDaemon"
- val classpathArgument = (classpath :+
cassandraBundle.getAbsolutePath).mkString(File.pathSeparator)
-
- val builder = new ProcessBuilder(
- javaBin,
- "-cp",
- classpathArgument,
- "-Dcassandra.config=file:" + configFile.getAbsoluteFile,
- "-Dcassandra-foreground=true",
- className).inheritIO()
-
- val process = builder.start()
-
- val shutdownHook = new Thread {
- override def run(): Unit = {
- process.destroyForcibly()
- }
- }
- Runtime.getRuntime.addShutdownHook(shutdownHook)
-
- // We wait for Cassandra to start listening before we return, since
running in non fork mode will also not
- // return until Cassandra has started listening.
- waitForCassandraToListen(host, port)
- cassandraDaemon = Some(new Closeable {
- override def close(): Unit = {
- process.destroy()
- try {
- Runtime.getRuntime.removeShutdownHook(shutdownHook)
- } catch {
- case _: IllegalStateException =>
- // JVM is already shutting down
- }
-
- if (process.waitFor(ForcedShutdownTimeout.toMillis,
TimeUnit.MILLISECONDS)) {
- val exitStatus = process.exitValue()
- // Java processes killed with SIGTERM may exit with a status of 143
- if (exitStatus != 0 && exitStatus != 143) {
- sys.error(s"Cassandra exited with non zero status:
${process.exitValue()}")
- }
- } else {
- process.destroyForcibly()
- sys.error(s"Cassandra process did not stop within
$ForcedShutdownTimeout, killing.")
- }
- }
- })
- }
-
- /**
- * Stops Cassandra. However, it will not be possible to start Cassandra
- * again in same JVM.
- */
- def stop(): Unit = this.synchronized {
- cassandraDaemon.foreach(_.close())
- cassandraDaemon = None
- }
-
- private def readResource(resource: String): String = {
- val sb = new StringBuilder
- val is = getClass.getResourceAsStream("/" + resource)
- require(is != null, s"resource [$resource] doesn't exist")
- val reader = new BufferedReader(new InputStreamReader(is))
- try {
- var line = reader.readLine()
- while (line != null) {
- sb.append(line).append('\n')
- line = reader.readLine()
- }
- } finally {
- reader.close()
- }
- sb.toString
- }
-
- private def writeToFile(file: File, content: String): Unit = {
- val writer = new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(file), "utf-8"))
- try {
- writer.write(content)
- } finally {
- writer.close()
- }
- }
-
- private def waitForCassandraToListen(host: String, port: Int) = {
- val deadline = AwaitListenTimeout.fromNow
- @annotation.tailrec
- def tryConnect(): Unit = {
- val retry =
- try {
- new Socket(host, port).close()
- false
- } catch {
- case _: IOException if deadline.hasTimeLeft() =>
- Thread.sleep(AwaitListenPoll.toMillis)
- true
- case ioe: IOException =>
- throw new RuntimeException(s"Cassandra did not start within
$AwaitListenTimeout", ioe)
- }
- if (retry) tryConnect()
- }
- tryConnect()
- }
-
- private def deleteRecursive(file: File): Unit = {
- if (file.isDirectory) {
- file.listFiles().foreach(deleteRecursive)
- }
- file.delete()
- }
-
-}
diff --git
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
index f29c923..b522ea5 100644
---
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
+++
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
@@ -11,8 +11,9 @@ package org.apache.pekko.cluster.persistence.cassandra
import com.typesafe.config.ConfigFactory
import org.apache.pekko
+import pekko.persistence.cassandra.CassandraLifecycle
+import pekko.persistence.cassandra.query.TestActor
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import pekko.persistence.cassandra.testkit.CassandraLauncher
import pekko.persistence.journal.Tagged
import pekko.persistence.query.{ NoOffset, PersistenceQuery }
import pekko.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
@@ -20,8 +21,7 @@ import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
-
-import java.io.File
+import org.testcontainers.cassandra.CassandraContainer
object EventsByTagMultiJvmSpec extends MultiNodeConfig {
// No way to start and distribute the port so hard coding
@@ -53,6 +53,13 @@ object EventsByTagMultiJvmSpec extends MultiNodeConfig {
keyspace = $name
}
}
+
+ datastax-java-driver {
+ basic {
+ load-balancing-policy.local-datacenter = "datacenter1"
+ contact-points = ["127.0.0.1:$CassPort"]
+ }
+ }
""").withFallback(CassandraLifecycle.config))
}
@@ -75,6 +82,8 @@ abstract class EventsByTagMultiJvmSpec
override def initialParticipants: Int = roles.size
+ @volatile private var cassandraContainer: CassandraContainer[_] = _
+
"EventsByTag" must {
"init Cassandra" in {
@@ -147,26 +156,26 @@ abstract class EventsByTagMultiJvmSpec
}
enterBarrier("all-done")
+
+ runOn(node1) {
+ stopCassandra()
+ }
}
}
- def startCassandra(
- host: String,
- port: Int,
- systemName: String,
- cassandraConfigResource: String =
CassandraLauncher.DefaultTestConfigResource): Unit = {
- val cassandraDirectory = new File(s"target/$systemName-$port")
- CassandraLauncher.start(
- cassandraDirectory,
- configResource = cassandraConfigResource,
- clean = true,
- port = port,
- CassandraLauncher.classpathForResources("logback-test.xml"),
- Some(host))
+ def startCassandra(host: String, port: Int, systemName: String): Unit = {
+ // With testcontainers, Docker binds to all interfaces (0.0.0.0) by
default,
+ // so the host parameter is not needed for binding.
+ cassandraContainer = new CassandraContainer("cassandra:3.11")
+ cassandraContainer.setPortBindings(java.util.Arrays.asList(s"$port:9042"))
+ cassandraContainer.start()
}
def stopCassandra(): Unit = {
- CassandraLauncher.stop()
+ if (cassandraContainer != null) {
+ cassandraContainer.stop()
+ cassandraContainer = null
+ }
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
index c881641..ce84df4 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
@@ -13,17 +13,16 @@
package org.apache.pekko.persistence.cassandra
-import java.io.File
import org.apache.pekko
import pekko.actor.{ ActorSystem, Props }
import pekko.persistence.cassandra.CassandraLifecycle.AwaitPersistenceInit
import pekko.testkit.{ ImplicitSender, SocketUtil, TestKit }
import com.typesafe.config.ConfigFactory
-import pekko.persistence.cassandra.testkit.CassandraLauncher
import org.scalatest.Suite
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
+import org.testcontainers.cassandra.CassandraContainer
object ReconnectSpec {
val freePort = SocketUtil.temporaryLocalPort()
@@ -52,17 +51,14 @@ class ReconnectSpec
pa ! "hello"
expectNoMessage()
- CassandraLauncher.start(
- new File("target/ReconnectSpec"),
- configResource = CassandraLauncher.DefaultTestConfigResource,
- clean = true,
- port = ReconnectSpec.freePort,
- CassandraLauncher.classpathForResources("logback-test.xml"))
+ val cassandraContainer = new CassandraContainer("cassandra:3.11")
+
cassandraContainer.setPortBindings(java.util.Arrays.asList(s"${ReconnectSpec.freePort}:9042"))
+ cassandraContainer.start()
try {
CassandraLifecycle.awaitPersistenceInit(system)
} finally {
- CassandraLauncher.stop()
+ cassandraContainer.stop()
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
index ddda87f..396578f 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
@@ -16,12 +16,15 @@ package org.apache.pekko.persistence.cassandra.query
import java.time.temporal.ChronoUnit
import java.time.{ LocalDate, LocalDateTime, ZoneOffset }
import java.util.Optional
+
+import scala.concurrent.duration._
+
import java.util.UUID
import org.apache.pekko
import pekko.actor.{ PoisonPill, Props }
import pekko.event.Logging.Warning
import pekko.persistence.cassandra.journal.CassandraJournalStatements
-import pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Day }
+import pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Day,
PluginSettings }
import pekko.persistence.journal.{ Tagged, WriteEventAdapter }
import pekko.persistence.query.scaladsl.{ CurrentEventsByTagQuery,
EventsByTagQuery }
import pekko.persistence.query.{ EventEnvelope, NoOffset, Offset,
TimeBasedUUID }
@@ -38,9 +41,6 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually.eventually
-import scala.concurrent.duration._
-import pekko.persistence.cassandra.PluginSettings
-
object EventsByTagSpec {
def withProbe[T](probe: TestSubscriber.Probe[Any], f:
TestSubscriber.Probe[Any] => T): T = {
try {
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncherSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncherSpec.scala
deleted file mode 100644
index 354b74b..0000000
---
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncherSpec.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.persistence.cassandra.testkit
-
-import java.io.File
-import java.net.InetSocketAddress
-
-import org.apache.pekko
-import pekko.actor.ActorSystem
-import pekko.testkit.TestKit
-import com.datastax.oss.driver.api.core.CqlSession
-import org.scalatest.wordspec.AnyWordSpecLike
-import org.scalatest.matchers.should.Matchers
-
-import scala.concurrent.duration._
-import org.scalatest.BeforeAndAfterAll
-
-class CassandraLauncherSpec
- extends TestKit(ActorSystem("CassandraLauncherSpec"))
- with Matchers
- with AnyWordSpecLike
- with BeforeAndAfterAll {
-
- override protected def afterAll(): Unit = {
- shutdown(system, verifySystemShutdown = true)
- CassandraLauncher.stop()
- super.afterAll()
- }
-
- private def testCassandra(): Unit = {
- val session =
- CqlSession
- .builder()
- .withLocalDatacenter("datacenter1")
- .addContactPoint(new InetSocketAddress("localhost",
CassandraLauncher.randomPort))
- .build()
- try session.execute("SELECT now() from system.local;").one()
- finally {
- session.close()
- }
- }
-
- "The CassandraLauncher" must {
- "support forking" in {
- val cassandraDirectory = new File("target/" + system.name)
- CassandraLauncher.start(
- cassandraDirectory,
- configResource = CassandraLauncher.DefaultTestConfigResource,
- clean = true,
- port = 0,
- CassandraLauncher.classpathForResources("logback-test.xml"))
-
- awaitAssert({
- testCassandra()
- }, 45.seconds)
-
- CassandraLauncher.stop()
-
- an[Exception] shouldBe thrownBy(testCassandra())
- }
- }
-
-}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 36b7337..d4b47fb 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -31,12 +31,15 @@ object Dependencies {
val nettyVersion = "4.2.14.Final"
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
+ val testcontainersVersion = "2.0.5"
+
val pekkoPersistenceCassandraDependencies = Seq(
"org.apache.cassandra" % "java-driver-core" % driverVersion,
"io.netty" % "netty-handler" % nettyVersion,
logback % Test,
"org.scalatest" %% "scalatest" % "3.2.20" % Test,
- "org.pegdown" % "pegdown" % "1.6.0" % Test)
+ "org.pegdown" % "pegdown" % "1.6.0" % Test,
+ "org.testcontainers" % "testcontainers-cassandra" % testcontainersVersion
% Test)
val exampleDependencies = Seq(
logback,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]