This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e21d33  build with pekko 2 (#407)
3e21d33 is described below

commit 3e21d335a30791af16fb60e93978db3632f1fb1e
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 18:00:23 2026 +0100

    build with pekko 2 (#407)
    
    * build with pekko 2
    
    * Update build.sbt
    
    * compile issues
    
    * Update EventProcessorStream.scala
    
    * Update EventProcessorStream.scala
    
    * Update CassandraSnapshotStore.scala
    
    * Update build.sbt
    
    * update docs
---
 build.sbt                                          |  7 +-
 .../pekko/persistence/cassandra/Retries.scala      |  4 +-
 .../cassandra/query/AllPersistenceIdsStage.scala   |  4 +-
 .../query/EventsByPersistenceIdStage.scala         |  5 +-
 .../snapshot/CassandraSnapshotStore.scala          |  4 +-
 docs/src/main/paradox/journal.md                   | 88 +++++++++++++++++++++-
 docs/src/main/paradox/snapshots.md                 | 33 +++++++-
 .../cassandra/example/EventProcessorStream.scala   | 11 +--
 project/PekkoConnectorsDependency.scala            |  2 +-
 project/PekkoCoreDependency.scala                  |  2 +-
 project/PekkoManagementDependency.scala            |  2 +-
 11 files changed, 139 insertions(+), 23 deletions(-)

diff --git a/build.sbt b/build.sbt
index bfeb188..6897128 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,6 +18,7 @@ sourceDistIncubating := false
 val mimaCompareVersion = "1.0.0"
 
 ThisBuild / reproducibleBuildsCheckResolver := Resolver.ApacheMavenStagingRepo
+ThisBuild / evictionErrorLevel := Level.Info
 
 lazy val root = project
   .in(file("."))
@@ -51,7 +52,11 @@ lazy val core = project
       "Automatic-Module-Name" -> "pekko.persistence.cassandra"),
     mimaReportSignatureProblems := true,
     mimaPreviousArtifacts := Set(
-      organization.value %% name.value % mimaCompareVersion))
+      organization.value %% name.value % mimaCompareVersion),
+    // following is needed by Agrona lib
+    // https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
+    Test / fork := true,
+    Test / javaOptions += 
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
   .configs(MultiJvm)
 
 // Used for testing events by tag in various environments
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
index 87364b4..650c46a 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/Retries.scala
@@ -16,7 +16,7 @@ package org.apache.pekko.persistence.cassandra
 import org.apache.pekko
 import pekko.actor.Scheduler
 import pekko.annotation.InternalApi
-import pekko.pattern.{ after, BackoffSupervisor }
+import pekko.pattern.{ after, RetrySupport }
 
 import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration.FiniteDuration
@@ -58,7 +58,7 @@ private[cassandra] object Retries {
     if (maxAttempts == -1 || maxAttempts - attempted != 1) {
       tryAttempt().recoverWith {
         case NonFatal(exc) =>
-          val nextDelay = BackoffSupervisor.calculateDelay(attempted, 
minBackoff, maxBackoff, randomFactor)
+          val nextDelay = RetrySupport.calculateDelay(attempted, minBackoff, 
maxBackoff, randomFactor)
           onFailure(attempted + 1, exc, nextDelay)
           after(nextDelay, scheduler) {
             retry(attempt, maxAttempts, onFailure, minBackoff, maxBackoff, 
randomFactor, attempted + 1)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
index 40f4ae8..2f7eab3 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/AllPersistenceIdsStage.scala
@@ -22,7 +22,6 @@ import pekko.stream.{ Attributes, Outlet, SourceShape }
 import com.datastax.oss.driver.api.core.CqlSession
 import com.datastax.oss.driver.api.core.cql.AsyncResultSet
 import com.datastax.oss.driver.api.core.cql.PreparedStatement
-import scala.annotation.nowarn
 
 import scala.collection.immutable.Queue
 import scala.concurrent.duration._
@@ -108,7 +107,6 @@ import scala.concurrent.duration._
         }
       }
 
-      @nowarn("msg=deprecated") // keep compatible with akka 2.5
       override def preStart(): Unit = {
         query()
         refreshInterval.foreach { interval =>
@@ -117,7 +115,7 @@ import scala.concurrent.duration._
               (interval / 2) + 
ThreadLocalRandom.current().nextLong(interval.toMillis / 2).millis
             else interval
 
-          schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
+          scheduleWithFixedDelay(Continue, initial, interval)
         }
       }
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index 48c842d..421ce42 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -24,7 +24,7 @@ import pekko.stream.{ Attributes, Outlet, SourceShape }
 
 import java.lang.{ Long => JLong }
 import java.util.concurrent.ThreadLocalRandom
-import scala.annotation.{ nowarn, tailrec }
+import scala.annotation.tailrec
 import scala.concurrent.duration.{ FiniteDuration, _ }
 import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.jdk.FutureConverters._
@@ -265,9 +265,8 @@ import scala.util.{ Failure, Success, Try }
         }
       }
 
-      @nowarn("msg=deprecated")
       private def scheduleContinue(initial: FiniteDuration, interval: 
FiniteDuration): Unit = {
-        schedulePeriodicallyWithInitialDelay(Continue, initial, interval)
+        scheduleWithFixedDelay(Continue, initial, interval)
       }
 
       override def postStop(): Unit = {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index 7026488..3d8cc98 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -29,7 +29,7 @@ import pekko.persistence.snapshot.SnapshotStore
 import pekko.serialization.{ AsyncSerializer, Serialization, 
SerializationExtension, Serializers }
 import pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession, 
CassandraSessionRegistry }
 import pekko.stream.scaladsl.{ Sink, Source }
-import pekko.util.{ unused, OptionVal }
+import pekko.util.OptionVal
 
 import java.lang.{ Long => JLong }
 import java.nio.ByteBuffer
@@ -42,7 +42,7 @@ import scala.util.{ Failure, Success }
 /**
  * INTERNAL API
  */
-@InternalApi private[pekko] class CassandraSnapshotStore(@unused cfg: Config, 
cfgPath: String)
+@InternalApi private[pekko] class CassandraSnapshotStore(cfg: Config, cfgPath: 
String)
     extends SnapshotStore
     with ActorLogging {
 
diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md
index 7467051..65ae619 100644
--- a/docs/src/main/paradox/journal.md
+++ b/docs/src/main/paradox/journal.md
@@ -30,12 +30,96 @@ CREATE KEYSPACE IF NOT EXISTS pekko WITH replication = 
{'class': 'NetworkTopolog
 
 For local testing, and the default if you enable 
`pekko.persistence.cassandra.journal.keyspace-autocreate` you can use the 
following:
 
-@@snip [journal-schema](/target/journal-keyspace.txt) { #journal-keyspace } 
+```
+CREATE KEYSPACE IF NOT EXISTS pekko
+WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
+```
 
 There are multiple tables required. These need to be created before starting 
your application.
 For local testing you can enable 
`pekko.persistence.cassandra.journal.tables-autocreate`. The default table 
definitions look like this:
 
-@@snip [journal-tables](/target/journal-tables.txt) { #journal-tables } 
+```
+CREATE TABLE IF NOT EXISTS pekko.messages (
+  persistence_id text,
+  partition_nr bigint,
+  sequence_nr bigint,
+  timestamp timeuuid,
+  timebucket text,
+  writer_uuid text,
+  ser_id int,
+  ser_manifest text,
+  event_manifest text,
+  event blob,
+  meta_ser_id int,
+  meta_ser_manifest text,
+  meta blob,
+  tags set<text>,
+  PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp))
+  WITH gc_grace_seconds = 864000
+  AND compaction = {
+    'class' : 'SizeTieredCompactionStrategy',
+    'enabled' : true,
+    'tombstone_compaction_interval' : 86400,
+    'tombstone_threshold' : 0.2,
+    'unchecked_tombstone_compaction' : false,
+    'bucket_high' : 1.5,
+    'bucket_low' : 0.5,
+    'max_threshold' : 32,
+    'min_threshold' : 4,
+    'min_sstable_size' : 50
+    };
+
+CREATE TABLE IF NOT EXISTS pekko.tag_views (
+  tag_name text,
+  persistence_id text,
+  sequence_nr bigint,
+  timebucket bigint,
+  timestamp timeuuid,
+  tag_pid_sequence_nr bigint,
+  writer_uuid text,
+  ser_id int,
+  ser_manifest text,
+  event_manifest text,
+  event blob,
+  meta_ser_id int,
+  meta_ser_manifest text,
+  meta blob,
+  PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, 
tag_pid_sequence_nr))
+  WITH gc_grace_seconds = 864000
+  AND compaction = {
+    'class' : 'SizeTieredCompactionStrategy',
+    'enabled' : true,
+    'tombstone_compaction_interval' : 86400,
+    'tombstone_threshold' : 0.2,
+    'unchecked_tombstone_compaction' : false,
+    'bucket_high' : 1.5,
+    'bucket_low' : 0.5,
+    'max_threshold' : 32,
+    'min_threshold' : 4,
+    'min_sstable_size' : 50
+    };
+
+CREATE TABLE IF NOT EXISTS pekko.tag_write_progress(
+  persistence_id text,
+  tag text,
+  sequence_nr bigint,
+  tag_pid_sequence_nr bigint,
+  offset timeuuid,
+  PRIMARY KEY (persistence_id, tag));
+
+CREATE TABLE IF NOT EXISTS pekko.tag_scanning(
+  persistence_id text,
+  sequence_nr bigint,
+  PRIMARY KEY (persistence_id));
+
+CREATE TABLE IF NOT EXISTS pekko.metadata(
+  persistence_id text PRIMARY KEY,
+  deleted_to bigint,
+  properties map<text,text>);
+
+CREATE TABLE IF NOT EXISTS pekko.all_persistence_ids(
+  persistence_id text PRIMARY KEY);
+```
 
 ### Messages table
 
diff --git a/docs/src/main/paradox/snapshots.md 
b/docs/src/main/paradox/snapshots.md
index 55d4b8c..682a4db 100644
--- a/docs/src/main/paradox/snapshots.md
+++ b/docs/src/main/paradox/snapshots.md
@@ -26,13 +26,42 @@ CREATE KEYSPACE IF NOT EXISTS pekko_snapshot WITH 
replication = {'class': 'Netwo
 
 For local testing, and the default if you enable 
`pekko.persistence.cassandra.snapshot.keyspace-autocreate` you can use the 
following:
 
-@@snip [snapshot-keyspace](/target/snapshot-keyspace.txt) { #snapshot-keyspace 
} 
+```
+CREATE KEYSPACE IF NOT EXISTS pekko_snapshot
+ WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
+```
 
 A single table is required. This needs to be created before starting your 
application.
 For local testing you can enable 
`pekko.persistence.cassandra.snapshot.tables-autocreate`.
 The default table definitions look like this:
 
-@@snip [snapshot-tables](/target/snapshot-tables.txt) { #snapshot-tables}
+```
+CREATE TABLE IF NOT EXISTS pekko_snapshot.snapshots (
+  persistence_id text,
+  sequence_nr bigint,
+  timestamp bigint,
+  ser_id int,
+  ser_manifest text,
+  snapshot_data blob,
+  snapshot blob,
+  meta_ser_id int,
+  meta_ser_manifest text,
+  meta blob,
+  PRIMARY KEY (persistence_id, sequence_nr))
+  WITH CLUSTERING ORDER BY (sequence_nr DESC) AND gc_grace_seconds = 864000
+  AND compaction = {
+    'class' : 'SizeTieredCompactionStrategy',
+    'enabled' : true,
+    'tombstone_compaction_interval' : 86400,
+    'tombstone_threshold' : 0.2,
+    'unchecked_tombstone_compaction' : false,
+    'bucket_high' : 1.5,
+    'bucket_low' : 0.5,
+    'max_threshold' : 32,
+    'min_threshold' : 4,
+    'min_sstable_size' : 50
+    };
+```
 
 ### ScyllaDB
 
diff --git 
a/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
 
b/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
index 874b469..02ce548 100644
--- 
a/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
+++ 
b/example/src/main/scala/org/apache/pekko/persistence/cassandra/example/EventProcessorStream.scala
@@ -12,15 +12,15 @@ package org.apache.pekko.persistence.cassandra.example
 import org.apache.pekko
 import pekko.{ Done, NotUsed }
 import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
 import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
 import pekko.persistence.query.{ Offset, PersistenceQuery, TimeBasedUUID }
 import pekko.persistence.typed.PersistenceId
-import pekko.stream.SharedKillSwitch
+import pekko.stream.{ RestartSettings, SharedKillSwitch }
 import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry
 import pekko.stream.scaladsl.{ RestartSource, Sink, Source }
 import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row }
 import org.slf4j.{ Logger, LoggerFactory }
-import pekko.actor.typed.scaladsl.LoggerOps
 import org.HdrHistogram.Histogram
 
 import scala.concurrent.{ ExecutionContext, Future }
@@ -34,16 +34,17 @@ class EventProcessorStream[Event: ClassTag](
     tag: String) {
 
   protected val log: Logger = LoggerFactory.getLogger(getClass)
-  implicit val sys: ActorSystem[_] = system
-  implicit val ec: ExecutionContext = executionContext
+  private implicit val sys: ActorSystem[_] = system
+  private implicit val ec: ExecutionContext = executionContext
 
   private val session = 
CassandraSessionRegistry(system).sessionFor("pekko.persistence.cassandra")
 
   private val query = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
 
   def runQueryStream(killSwitch: SharedKillSwitch, histogram: Histogram): Unit 
= {
+    val restartSettings = RestartSettings(minBackoff = 500.millis, maxBackoff 
= 20.seconds, randomFactor = 0.1)
     RestartSource
-      .withBackoff(minBackoff = 500.millis, maxBackoff = 20.seconds, 
randomFactor = 0.1) { () =>
+      .withBackoff(restartSettings) { () =>
         Source.futureSource {
           readOffset().map { offset =>
             log.infoN("Starting stream for tag [{}] from offset [{}]", tag, 
offset)
diff --git a/project/PekkoConnectorsDependency.scala 
b/project/PekkoConnectorsDependency.scala
index 7fe187e..7974cf5 100644
--- a/project/PekkoConnectorsDependency.scala
+++ b/project/PekkoConnectorsDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoConnectorsDependency extends PekkoDependency {
   override val checkProject: String = "pekko-connectors-cassandra"
   override val module: Option[String] = Some("connectors")
-  override val currentVersion: String = "1.3.0"
+  override val currentVersion: String = "2.0.0-M1"
 }
diff --git a/project/PekkoCoreDependency.scala 
b/project/PekkoCoreDependency.scala
index ed3ab68..37903c6 100644
--- a/project/PekkoCoreDependency.scala
+++ b/project/PekkoCoreDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoCoreDependency extends PekkoDependency {
   override val checkProject: String = "pekko-cluster-sharding-typed"
   override val module: Option[String] = None
-  override val currentVersion: String = "1.5.0"
+  override val currentVersion: String = "2.0.0-M2"
 }
diff --git a/project/PekkoManagementDependency.scala 
b/project/PekkoManagementDependency.scala
index e1a4c81..1e1a76d 100644
--- a/project/PekkoManagementDependency.scala
+++ b/project/PekkoManagementDependency.scala
@@ -20,5 +20,5 @@ import com.github.pjfanning.pekkobuild.PekkoDependency
 object PekkoManagementDependency extends PekkoDependency {
   override val checkProject: String = "pekko-discovery-aws-api-async"
   override val module: Option[String] = Some("management")
-  override val currentVersion: String = "1.2.1"
+  override val currentVersion: String = "2.0.0-M1"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to