This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 1805179  scala3 support for core module (#52)
1805179 is described below

commit 1805179e1b9e1f22a2ba25188e465fb87fb8ea4c
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Sep 1 17:59:21 2023 +0100

    scala3 support for core module (#52)
---
 .github/workflows/build-test.yml                   |  64 ++++++++++-
 build.sbt                                          |   9 +-
 .../persistence/r2dbc/journal/JournalDao.scala     |   4 +-
 .../r2dbc/query/R2dbcReadJournalProvider.scala     |   8 +-
 .../r2dbc/query/scaladsl/QueryDao.scala            |  24 ++--
 .../persistence/r2dbc/snapshot/SnapshotDao.scala   |   8 +-
 .../r2dbc/state/scaladsl/DurableStateDao.scala     |  14 +--
 .../r2dbc/journal/PersistTagsSpec.scala            |   2 +-
 .../r2dbc/journal/PersistTimestampSpec.scala       |   4 +-
 .../query/CurrentPersistenceIdsQuerySpec.scala     |   2 +-
 .../r2dbc/query/EventsByPersistenceIdSpec.scala    |   2 +-
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      |  10 +-
 .../r2dbc/query/EventsBySliceSpec.scala            |   2 +-
 .../state/CurrentPersistenceIdsQuerySpec.scala     |   2 +-
 .../r2dbc/state/DurableStateBySliceSpec.scala      |   4 +-
 .../projection/R2dbcProjectionDocExample.scala     |   2 +-
 project/Dependencies.scala                         |   1 +
 .../projection/r2dbc/R2dbcOffsetStoreSpec.scala    |   2 +-
 .../projection/r2dbc/R2dbcProjectionSpec.scala     | 124 ++++++++++-----------
 .../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala |  66 +++++------
 20 files changed, 213 insertions(+), 141 deletions(-)

diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 5ee9af3..23e9ec9 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -9,7 +9,7 @@ on:
 
 jobs:
   compile:
-    name: Test and compile
+    name: Test and compile (scala 2.12/2.13)
     runs-on: ubuntu-latest
     strategy:
       matrix:
@@ -36,6 +36,30 @@ jobs:
       - name: Compile and test for JDK ${{ matrix.JAVA_VERSION }}, Scala ${{ 
matrix.SCALA_VERSION }}
         run: sbt ++${{ matrix.SCALA_VERSION }} test:compile
 
+  compile3:
+    name: Test and compile (scala 3)
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v3
+        with:
+          fetch-depth: 0
+
+      - name: Setup Java 8
+        uses: actions/setup-java@v3
+        with:
+          distribution: temurin
+          java-version: 8
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@v6
+
+      - name: Enable jvm-opts
+        run: cp .jvmopts-ci .jvmopts
+
+      - name: Compile
+        run: sbt ++3.3 core/Test/compile
+
   test-postgres:
     name: Run test with Postgres
     runs-on: ubuntu-latest
@@ -78,6 +102,44 @@ jobs:
       - name: test
         run: sbt ++${{ matrix.SCALA_VERSION }} test
 
+  test-postgres-scala3:
+    name: Run test with Postgres (Scala 3)
+    runs-on: ubuntu-latest
+    if: github.repository == 'apache/incubator-pekko-persistence-r2dbc'
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v3
+        with:
+          fetch-depth: 0
+
+      - name: Checkout GitHub merge
+        if: github.event.pull_request
+        run: |-
+          git fetch origin pull/${{ github.event.pull_request.number 
}}/merge:scratch
+          git checkout scratch
+
+      - name: Setup Java 8
+        uses: actions/setup-java@v3
+        with:
+          distribution: temurin
+          java-version: 8
+
+      - name: Cache Coursier cache
+        uses: coursier/[email protected]
+
+      - name: Enable jvm-opts
+        run: cp .jvmopts-ci .jvmopts
+
+      - name: Start DB
+        run: |-
+          docker-compose -f docker/docker-compose-postgres.yml up -d
+          # TODO: could we poll the port instead of sleep?
+          sleep 10
+          docker exec -i docker_postgres-db_1 psql -U postgres -t < 
ddl-scripts/create_tables_postgres.sql
+
+      - name: test
+        run: sbt ++3.3 core/test
+
   test-yugabyte:
     name: Run tests with Yugabyte
     runs-on: ubuntu-latest
diff --git a/build.sbt b/build.sbt
index 4d56056..0fededa 100644
--- a/build.sbt
+++ b/build.sbt
@@ -62,12 +62,17 @@ def suffixFileFilter(suffix: String): FileFilter = new 
SimpleFileFilter(f => f.g
 
 lazy val core = (project in file("core"))
   .enablePlugins(ReproducibleBuildsPlugin)
-  .settings(name := "pekko-persistence-r2dbc", libraryDependencies ++= 
Dependencies.core)
+  .settings(
+    name := "pekko-persistence-r2dbc",
+    crossScalaVersions += Dependencies.Scala3,
+    libraryDependencies ++= Dependencies.core)
 
 lazy val projection = (project in file("projection"))
   .dependsOn(core)
   .enablePlugins(ReproducibleBuildsPlugin)
-  .settings(name := "pekko-projection-r2dbc", libraryDependencies ++= 
Dependencies.projection)
+  .settings(
+    name := "pekko-projection-r2dbc",
+    libraryDependencies ++= Dependencies.projection)
 
 lazy val migration = (project in file("migration"))
   .enablePlugins(ReproducibleBuildsPlugin)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 64544aa..8ca0198 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -65,7 +65,7 @@ private[r2dbc] object JournalDao {
       case metaPayload =>
         Some(
           SerializedEventMetadata(
-            serId = row.get("meta_ser_id", classOf[Integer]),
+            serId = row.get[Integer]("meta_ser_id", classOf[Integer]),
             serManifest = row.get("meta_ser_manifest", classOf[String]),
             metaPayload))
     }
@@ -243,7 +243,7 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
             .bind(0, persistenceId)
             .bind(1, fromSequenceNr),
         row => {
-          val seqNr = row.get(0, classOf[java.lang.Long])
+          val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
           if (seqNr eq null) 0L else seqNr.longValue
         })
       .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
index 1da1a07..d2d30dd 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
@@ -20,8 +20,12 @@ import com.typesafe.config.Config
 
 final class R2dbcReadJournalProvider(system: ExtendedActorSystem, config: 
Config, cfgPath: String)
     extends ReadJournalProvider {
-  override val scaladslReadJournal: scaladsl.R2dbcReadJournal =
+  private val readJournalScala: scaladsl.R2dbcReadJournal =
     new scaladsl.R2dbcReadJournal(system, config, cfgPath)
 
-  override val javadslReadJournal: javadsl.R2dbcReadJournal = new 
javadsl.R2dbcReadJournal(scaladslReadJournal)
+  private val readJournalJava: javadsl.R2dbcReadJournal = new 
javadsl.R2dbcReadJournal(readJournalScala)
+
+  override def scaladslReadJournal(): scaladsl.R2dbcReadJournal = 
readJournalScala
+
+  override def javadslReadJournal(): javadsl.R2dbcReadJournal = readJournalJava
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index d1e1544..0fff879 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -179,10 +179,10 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
       row =>
         if (backtracking)
           SerializedJournalRow(
-            slice = row.get("slice", classOf[Integer]),
+            slice = row.get[Integer]("slice", classOf[Integer]),
             entityType,
             persistenceId = row.get("persistence_id", classOf[String]),
-            seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+            seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
             payload = None, // lazy loaded for backtracking
@@ -193,14 +193,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
             metadata = None)
         else
           SerializedJournalRow(
-            slice = row.get("slice", classOf[Integer]),
+            slice = row.get[Integer]("slice", classOf[Integer]),
             entityType,
             persistenceId = row.get("persistence_id", classOf[String]),
-            seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+            seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
             payload = Some(row.get("event_payload", classOf[Array[Byte]])),
-            serId = row.get("event_ser_id", classOf[Integer]),
+            serId = row.get[Integer]("event_ser_id", classOf[Integer]),
             serManifest = row.get("event_ser_manifest", classOf[String]),
             writerUuid = "", // not need in this query
             tags = Set.empty, // tags not fetched in queries (yet)
@@ -241,8 +241,8 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
           .bind(2, toTimestamp)
           .bind(3, limit),
       row => {
-        val bucketStartEpochSeconds = row.get("bucket", 
classOf[java.lang.Long]).toLong * 10
-        val count = row.get("count", classOf[java.lang.Long]).toLong
+        val bucketStartEpochSeconds = row.get[java.lang.Long]("bucket", 
classOf[java.lang.Long]) * 10
+        val count = row.get[java.lang.Long]("count", classOf[java.lang.Long])
         Bucket(bucketStartEpochSeconds, count)
       })
 
@@ -279,14 +279,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
           .bind(1, seqNr),
       row =>
         SerializedJournalRow(
-          slice = row.get("slice", classOf[Integer]),
+          slice = row.get[Integer]("slice", classOf[Integer]),
           entityType = row.get("entity_type", classOf[String]),
           persistenceId,
           seqNr,
           dbTimestamp = row.get("db_timestamp", classOf[Instant]),
           readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
           payload = Some(row.get("event_payload", classOf[Array[Byte]])),
-          serId = row.get("event_ser_id", classOf[Integer]),
+          serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = "", // not need in this query
           tags = Set.empty, // tags not fetched in queries (yet)
@@ -307,14 +307,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
           .bind(3, settings.querySettings.bufferSize),
       row =>
         SerializedJournalRow(
-          slice = row.get("slice", classOf[Integer]),
+          slice = row.get[Integer]("slice", classOf[Integer]),
           entityType = row.get("entity_type", classOf[String]),
           persistenceId = row.get("persistence_id", classOf[String]),
-          seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+          seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
           dbTimestamp = row.get("db_timestamp", classOf[Instant]),
           readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
           payload = Some(row.get("event_payload", classOf[Array[Byte]])),
-          serId = row.get("event_ser_id", classOf[Integer]),
+          serId = row.get[Integer]("event_ser_id", classOf[Integer]),
           serManifest = row.get("event_ser_manifest", classOf[String]),
           writerUuid = row.get("writer", classOf[String]),
           tags = Set.empty, // tags not fetched in queries (yet)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 55fcd26..16968b3 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -51,12 +51,12 @@ private[r2dbc] object SnapshotDao {
   private def collectSerializedSnapshot(row: Row): SerializedSnapshotRow =
     SerializedSnapshotRow(
       row.get("persistence_id", classOf[String]),
-      row.get("seq_nr", classOf[java.lang.Long]),
-      row.get("write_timestamp", classOf[java.lang.Long]),
+      row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+      row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]),
       row.get("snapshot", classOf[Array[Byte]]),
-      row.get("ser_id", classOf[Integer]),
+      row.get[Integer]("ser_id", classOf[Integer]),
       row.get("ser_manifest", classOf[String]), {
-        val metaSerializerId = row.get("meta_ser_id", classOf[Integer])
+        val metaSerializerId = row.get[Integer]("meta_ser_id", 
classOf[Integer])
         if (metaSerializerId eq null) None
         else
           Some(
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index ce080ae..041369b 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -178,11 +178,11 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
       row =>
         SerializedStateRow(
           persistenceId = persistenceId,
-          revision = row.get("revision", classOf[java.lang.Long]),
+          revision = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
           dbTimestamp = row.get("db_timestamp", classOf[Instant]),
           readDbTimestamp = Instant.EPOCH, // not needed here
           payload = row.get("state_payload", classOf[Array[Byte]]),
-          serId = row.get("state_ser_id", classOf[Integer]),
+          serId = row.get[Integer]("state_ser_id", classOf[Integer]),
           serManifest = row.get("state_ser_manifest", classOf[String]),
           tags = Set.empty // tags not fetched in queries (yet)
         ))
@@ -325,7 +325,7 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
         if (backtracking)
           SerializedStateRow(
             persistenceId = row.get("persistence_id", classOf[String]),
-            revision = row.get("revision", classOf[java.lang.Long]),
+            revision = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
             payload = null, // lazy loaded for backtracking
@@ -336,11 +336,11 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
         else
           SerializedStateRow(
             persistenceId = row.get("persistence_id", classOf[String]),
-            revision = row.get("revision", classOf[java.lang.Long]),
+            revision = row.get[java.lang.Long]("revision", 
classOf[java.lang.Long]),
             dbTimestamp = row.get("db_timestamp", classOf[Instant]),
             readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
             payload = row.get("state_payload", classOf[Array[Byte]]),
-            serId = row.get("state_ser_id", classOf[Integer]),
+            serId = row.get[Integer]("state_ser_id", classOf[Integer]),
             serManifest = row.get("state_ser_manifest", classOf[String]),
             tags = Set.empty // tags not fetched in queries (yet)
           ))
@@ -409,8 +409,8 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
           .bind(2, toTimestamp)
           .bind(3, limit),
       row => {
-        val bucketStartEpochSeconds = row.get("bucket", 
classOf[java.lang.Long]).toLong * 10
-        val count = row.get("count", classOf[java.lang.Long]).toLong
+        val bucketStartEpochSeconds = row.get[java.lang.Long]("bucket", 
classOf[java.lang.Long]) * 10
+        val count = row.get[java.lang.Long]("count", classOf[java.lang.Long])
         Bucket(bucketStartEpochSeconds, count)
       })
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index 4f97a79..7a490ed 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -73,7 +73,7 @@ class PersistTagsSpec
               }
               Row(
                 pid = row.get("persistence_id", classOf[String]),
-                seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+                seqNr = row.get[java.lang.Long]("seq_nr", 
classOf[java.lang.Long]),
                 tags)
             })
           .futureValue
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
index ca4eb7e..b021abb 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
@@ -81,13 +81,13 @@ class PersistTimestampSpec
               val event = serialization
                 .deserialize(
                   row.get("event_payload", classOf[Array[Byte]]),
-                  row.get("event_ser_id", classOf[Integer]),
+                  row.get[Integer]("event_ser_id", classOf[Integer]),
                   row.get("event_ser_manifest", classOf[String]))
                 .get
                 .asInstanceOf[String]
               Row(
                 pid = row.get("persistence_id", classOf[String]),
-                seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+                seqNr = row.get[java.lang.Long]("seq_nr", 
classOf[java.lang.Long]),
                 dbTimestamp = row.get("db_timestamp", classOf[Instant]),
                 event)
             })
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
index 76d7421..67c7ec8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
@@ -57,7 +57,7 @@ class CurrentPersistenceIdsQuerySpec
   override protected def beforeAll(): Unit = {
     super.beforeAll()
 
-    val probe = createTestProbe[Done]
+    val probe = createTestProbe[Done]()
     pids.foreach { pid =>
       val persister = spawn(TestActors.Persister(pid))
       persister ! Persister.PersistWithAck("e-1", probe.ref)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
index d6cee73..d08d2da 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
@@ -195,7 +195,7 @@ class EventsByPersistenceIdSpec
 
   "Live query" should {
     "pick up new events" in {
-      val pid = nextPid
+      val pid = nextPid()
       val persister = testKit.spawn(Persister(pid))
       val probe = testKit.createTestProbe[Done]()
       val sub = query
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 2c1ca6c..edb726a 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -67,11 +67,11 @@ class EventsBySlicePubSubSpec
   private val query = 
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
 
   private class Setup {
-    val entityType = nextEntityType()
-    val persistenceId = nextPid(entityType)
+    val setupEntityType = nextEntityType()
+    val persistenceId = nextPid(setupEntityType)
     val slice = query.sliceForPersistenceId(persistenceId)
     val persister = spawn(TestActors.Persister(persistenceId))
-    val probe = createTestProbe[Done]
+    val probe = createTestProbe[Done]()
     val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)
   }
 
@@ -101,11 +101,11 @@ class EventsBySlicePubSubSpec
     "publish new events" in new Setup {
 
       val result: TestSubscriber.Probe[EventEnvelope[String]] =
-        query.eventsBySlices[String](entityType, slice, slice, 
NoOffset).runWith(sinkProbe).request(10)
+        query.eventsBySlices[String](setupEntityType, slice, slice, 
NoOffset).runWith(sinkProbe).request(10)
 
       val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
       eventually {
-        PubSub(typedSystem).eventTopic[String](entityType, slice) ! 
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+        PubSub(typedSystem).eventTopic[String](setupEntityType, slice) ! 
TopicImpl.GetTopicStats(topicStatsProbe.ref)
         topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1
       }
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
index 4a84698..ba2fdd8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
@@ -92,7 +92,7 @@ class EventsBySliceSpec
     val persistenceId = nextPid(entityType)
     val slice = query.sliceForPersistenceId(persistenceId)
     val persister = spawn(TestActors.Persister(persistenceId))
-    val probe = createTestProbe[Done]
+    val probe = createTestProbe[Done]()
     val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)
   }
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
index 6d0be4c..d0df18b 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
@@ -58,7 +58,7 @@ class CurrentPersistenceIdsQuerySpec
   override protected def beforeAll(): Unit = {
     super.beforeAll()
 
-    val probe = createTestProbe[Done]
+    val probe = createTestProbe[Done]()
     pids.foreach { pid =>
       val persister = spawn(TestActors.DurableStatePersister(pid))
       persister ! DurableStatePersister.PersistWithAck("s-1", probe.ref)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
index 4e556cb..a344643 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
@@ -77,11 +77,11 @@ class DurableStateBySliceSpec
     
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
 
   private class Setup {
-    val entityType = nextEntityType
+    val entityType = nextEntityType()
     val persistenceId = nextPid(entityType)
     val slice = query.sliceForPersistenceId(persistenceId)
     val persister = spawn(TestActors.DurableStatePersister(persistenceId))
-    val probe = createTestProbe[Done]
+    val probe = createTestProbe[Done]()
     val updatedDurableStateProbe = 
createTestProbe[UpdatedDurableState[String]]()
     val killSwitch = KillSwitches.shared("test")
   }
diff --git 
a/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala 
b/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
index dce41b2..c8150e4 100644
--- a/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
+++ b/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
@@ -109,7 +109,7 @@ object R2dbcProjectionDocExample {
   }
   // #grouped-handler
 
-  implicit val system = ActorSystem[Nothing](Behaviors.empty, "Example")
+  implicit val system: ActorSystem[Nothing] = 
ActorSystem[Nothing](Behaviors.empty, "Example")
   implicit val ec: ExecutionContext = system.executionContext
 
   object IllustrateInit {
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 687a2d8..5745b51 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -16,6 +16,7 @@ import sbt._
 object Dependencies {
   val Scala212 = "2.12.18"
   val Scala213 = "2.13.11"
+  val Scala3 = "3.3.0"
   val PekkoVersion = System.getProperty("override.pekko.version", "1.0.1")
   val PekkoVersionInDocs = "current"
   val PekkoPersistenceJdbcVersion = "0.0.0+998-6a9e5841-SNAPSHOT"
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index 76e333b..4bf106d 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -63,7 +63,7 @@ class R2dbcOffsetStoreSpec
             .createStatement(selectLastSql)
             .bind(0, projectionId.name)
             .bind(1, projectionId.key),
-        row => Instant.ofEpochMilli(row.get("last_updated", 
classOf[java.lang.Long])))
+        row => Instant.ofEpochMilli(row.get[java.lang.Long]("last_updated", 
classOf[java.lang.Long])))
       .futureValue
       .getOrElse(throw new RuntimeException(s"no records found for 
$projectionId"))
   }
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
index 5afe017..0560568 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
@@ -268,9 +268,9 @@ class R2dbcProjectionSpec
   "A R2DBC exactly-once projection" must {
 
     "persist projection and offset in the same write operation 
(transactional)" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection.exactlyOnce(
@@ -287,9 +287,9 @@ class R2dbcProjectionSpec
     }
 
     "skip failing events when using RecoveryStrategy.skip" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val bogusEventHandler = new ConcatHandler(_ == 4)
 
@@ -310,9 +310,9 @@ class R2dbcProjectionSpec
     }
 
     "store offset for failing events when using RecoveryStrategy.skip" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val bogusEventHandler = new ConcatHandler(_ == 6)
 
@@ -333,9 +333,9 @@ class R2dbcProjectionSpec
     }
 
     "skip failing events after retrying when using 
RecoveryStrategy.retryAndSkip" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val bogusEventHandler = new ConcatHandler(_ == 4)
 
@@ -380,9 +380,9 @@ class R2dbcProjectionSpec
     }
 
     "fail after retrying when using RecoveryStrategy.retryAndFail" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val bogusEventHandler = new ConcatHandler(_ == 4)
 
@@ -407,9 +407,9 @@ class R2dbcProjectionSpec
     }
 
     "restart from previous offset - fail with throwing an exception" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       def exactlyOnceProjection(failWhenOffset: Long => Boolean = _ => false) 
= {
         R2dbcProjection.exactlyOnce(
@@ -435,9 +435,9 @@ class R2dbcProjectionSpec
     }
 
     "restart from previous offset - fail with bad insert on user code" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val bogusEventHandler = new R2dbcHandler[Envelope] {
         override def process(session: R2dbcSession, envelope: Envelope): 
Future[Done] = {
@@ -473,9 +473,9 @@ class R2dbcProjectionSpec
 
     "verify offsets before and after processing an envelope" in {
 
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       case class ProbeMessage(str: String, offset: Long)
       val verificationProbe = createTestProbe[ProbeMessage]("verification")
@@ -515,9 +515,9 @@ class R2dbcProjectionSpec
     }
 
     "skip record if offset verification fails before processing envelope" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val testVerification = (offset: Long) => {
         if (offset == 3L)
@@ -541,9 +541,9 @@ class R2dbcProjectionSpec
     }
 
     "skip record if offset verification fails after processing envelope" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val testVerification = (offset: Long) => {
         if (offset == 3L)
@@ -569,9 +569,9 @@ class R2dbcProjectionSpec
 
   "A R2DBC grouped projection" must {
     "persist projection and offset in the same write operation 
(transactional)" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val handlerCalled = "called"
       val handlerProbe = createTestProbe[String]("calls-to-handler")
@@ -612,9 +612,9 @@ class R2dbcProjectionSpec
     }
 
     "handle grouped async projection and store offset" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val result = new StringBuffer()
 
@@ -648,9 +648,9 @@ class R2dbcProjectionSpec
   "A R2DBC at-least-once projection" must {
 
     "persist projection and offset" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection.atLeastOnce(
@@ -669,9 +669,9 @@ class R2dbcProjectionSpec
     }
 
     "skip failing events when using RecoveryStrategy.skip, save after 1" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection
@@ -693,9 +693,9 @@ class R2dbcProjectionSpec
     }
 
     "skip failing events when using RecoveryStrategy.skip, save after 2" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection
@@ -717,9 +717,9 @@ class R2dbcProjectionSpec
     }
 
     "restart from previous offset - handler throwing an exception, save after 
1" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       def atLeastOnceProjection(failWhenOffset: Long => Boolean = _ => false) =
         R2dbcProjection
@@ -762,9 +762,9 @@ class R2dbcProjectionSpec
     }
 
     "restart from previous offset - handler throwing an exception, save after 
2" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       offsetStore.readOffset[Long]().futureValue shouldBe empty
 
@@ -813,9 +813,9 @@ class R2dbcProjectionSpec
     }
 
     "save offset after number of elements" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       import pekko.actor.typed.scaladsl.adapter._
       val sourceProbe = new AtomicReference[TestPublisher.Probe[Envelope]]()
@@ -858,9 +858,9 @@ class R2dbcProjectionSpec
     }
 
     "save offset after idle duration" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       import pekko.actor.typed.scaladsl.adapter._
       val sourceProbe = new AtomicReference[TestPublisher.Probe[Envelope]]()
@@ -904,9 +904,9 @@ class R2dbcProjectionSpec
     }
 
     "verify offsets before processing an envelope" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
       val verifiedProbe = createTestProbe[Long]()
 
       val testVerification = (offset: Long) => {
@@ -934,9 +934,9 @@ class R2dbcProjectionSpec
     }
 
     "skip record if offset verification fails before processing envelope" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val testVerification = (offset: Long) => {
         if (offset == 3L)
@@ -961,9 +961,9 @@ class R2dbcProjectionSpec
     }
 
     "handle async projection and store offset" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val result = new StringBuffer()
 
@@ -994,9 +994,9 @@ class R2dbcProjectionSpec
   "A R2DBC flow projection" must {
 
     "persist projection and offset" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       offsetShouldBeEmpty()
 
@@ -1072,9 +1072,9 @@ class R2dbcProjectionSpec
     }
 
     "call start and stop of the handler" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val handlerProbe = createTestProbe[String]()
       val handler = new LifecycleHandler(handlerProbe.ref)
@@ -1110,9 +1110,9 @@ class R2dbcProjectionSpec
     }
 
     "call start and stop of the handler when using TestKit.runWithTestSink" in 
{
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val handlerProbe = createTestProbe[String]()
       val handler = new LifecycleHandler(handlerProbe.ref)
@@ -1149,9 +1149,9 @@ class R2dbcProjectionSpec
     }
 
     "call start and stop of handler when restarted" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val handlerProbe = createTestProbe[String]()
       @volatile var _handler: Option[LifecycleHandler] = None
@@ -1220,9 +1220,9 @@ class R2dbcProjectionSpec
     }
 
     "call start and stop of handler when failed but no restart" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val handlerProbe = createTestProbe[String]()
       val failOnceOnOffset = new AtomicInteger(4)
@@ -1248,9 +1248,9 @@ class R2dbcProjectionSpec
     }
 
     "be able to stop when retrying" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val handlerProbe = createTestProbe[String]()
       val handler = new LifecycleHandler(handlerProbe.ref, alwaysFailOnOffset 
= 4)
@@ -1280,9 +1280,9 @@ class R2dbcProjectionSpec
   "R2dbcProjection management" must {
 
     "restart from beginning when offset is cleared" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection
@@ -1312,9 +1312,9 @@ class R2dbcProjectionSpec
     }
 
     "restart from updated offset" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection
@@ -1345,9 +1345,9 @@ class R2dbcProjectionSpec
     }
 
     "pause projection" in {
-      implicit val entityId = UUID.randomUUID().toString
+      implicit val entityId: String = UUID.randomUUID().toString
       val projectionId = genRandomProjectionId()
-      implicit val offsetStore = createOffsetStore(projectionId)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId)
 
       val projection =
         R2dbcProjection
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 61007e1..163b5ee 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -376,7 +376,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.exactlyOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -393,7 +393,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid1, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val bogusEventHandler = new ConcatHandler(_.sequenceNr == 4)
 
@@ -414,7 +414,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid1, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val bogusEventHandler = new ConcatHandler(_.sequenceNr == 6)
 
@@ -435,7 +435,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid1, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val statusProbe = createTestProbe[TestStatusObserver.Status]()
       val progressProbe = 
createTestProbe[TestStatusObserver.OffsetProgress[Envelope]]()
@@ -478,7 +478,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid1, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val bogusEventHandler = new ConcatHandler(_.sequenceNr == 4)
 
@@ -504,7 +504,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       def exactlyOnceProjection(failWhen: EventEnvelope[String] => Boolean = _ 
=> false) = {
         R2dbcProjection.exactlyOnce(
@@ -535,7 +535,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.exactlyOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -555,7 +555,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val startTime = Instant.now()
       val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1, 
pid2)
       val sourceProvider1 = createSourceProvider(envelopes1)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider1)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider1)
 
       val projection1 =
         R2dbcProjection.exactlyOnce(projectionId, Some(settings), 
sourceProvider1, handler = () => new ConcatHandler)
@@ -591,7 +591,7 @@ class R2dbcTimestampOffsetProjectionSpec
           env
       }
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.exactlyOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -611,7 +611,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val handlerProbe = createTestProbe[String]("calls-to-handler")
 
@@ -638,7 +638,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val handlerProbe = createTestProbe[String]("calls-to-handler")
 
@@ -667,7 +667,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val startTime = Instant.now()
       val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1, 
pid2)
       val sourceProvider1 = createBacktrackingSourceProvider(envelopes1)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider1)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider1)
 
       val handlerProbe = createTestProbe[String]()
 
@@ -716,7 +716,7 @@ class R2dbcTimestampOffsetProjectionSpec
           env
       }
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val handlerProbe = createTestProbe[String]("calls-to-handler")
 
@@ -742,7 +742,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val result = new StringBuffer()
 
@@ -776,7 +776,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val result1 = new StringBuffer()
       val result2 = new StringBuffer()
@@ -815,7 +815,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
       val startTime = Instant.now()
       val sourceProvider = new TestSourceProviderWithInput()
-      implicit val offsetStore =
+      implicit val offsetStore: R2dbcOffsetStore =
         new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
 
       val result1 = new StringBuffer()
@@ -875,7 +875,7 @@ class R2dbcTimestampOffsetProjectionSpec
           env
       }
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val result = new StringBuffer()
 
@@ -911,7 +911,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.atLeastOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -931,7 +931,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.atLeastOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -952,7 +952,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
       val startTime = Instant.now()
       val sourceProvider = new TestSourceProviderWithInput()
-      implicit val offsetStore =
+      implicit val offsetStore: R2dbcOffsetStore =
         new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
 
       val projectionRef = spawn(
@@ -989,7 +989,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid1, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val failOnce = new AtomicBoolean(true)
       val failPredicate = (ev: EventEnvelope[String]) => {
@@ -1026,7 +1026,7 @@ class R2dbcTimestampOffsetProjectionSpec
           env
       }
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.atLeastOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -1045,7 +1045,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val result = new StringBuffer()
 
@@ -1075,7 +1075,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid1, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val failOnce = new AtomicBoolean(true)
       val failPredicate = (ev: EventEnvelope[String]) => {
@@ -1120,7 +1120,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val result1 = new StringBuffer()
       val result2 = new StringBuffer()
@@ -1156,7 +1156,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val startTime = Instant.now()
       val sourceProvider = new TestSourceProviderWithInput()
-      implicit val offsetStore =
+      implicit val offsetStore: R2dbcOffsetStore =
         new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
 
       val result1 = new StringBuffer()
@@ -1216,7 +1216,7 @@ class R2dbcTimestampOffsetProjectionSpec
       }
 
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection.atLeastOnce(projectionId, Some(settings), 
sourceProvider, handler = () => new ConcatHandler)
@@ -1238,7 +1238,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       offsetShouldBeEmpty()
 
@@ -1267,7 +1267,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       info(s"pid1 [$pid1], pid2 [$pid2]")
 
@@ -1298,7 +1298,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
       val startTime = Instant.now()
       val sourceProvider = new TestSourceProviderWithInput()
-      implicit val offsetStore =
+      implicit val offsetStore: R2dbcOffsetStore =
         new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, 
settings, r2dbcExecutor)
 
       val flowHandler =
@@ -1351,7 +1351,7 @@ class R2dbcTimestampOffsetProjectionSpec
           env
       }
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       offsetShouldBeEmpty()
 
@@ -1383,7 +1383,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection
@@ -1417,7 +1417,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6)
       val sourceProvider = createSourceProvider(envelopes)
-      implicit val offsetStore = createOffsetStore(projectionId, 
sourceProvider)
+      implicit val offsetStore: R2dbcOffsetStore = 
createOffsetStore(projectionId, sourceProvider)
 
       val projection =
         R2dbcProjection


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to