This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 1805179 scala3 support for core module (#52)
1805179 is described below
commit 1805179e1b9e1f22a2ba25188e465fb87fb8ea4c
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Sep 1 17:59:21 2023 +0100
scala3 support for core module (#52)
---
.github/workflows/build-test.yml | 64 ++++++++++-
build.sbt | 9 +-
.../persistence/r2dbc/journal/JournalDao.scala | 4 +-
.../r2dbc/query/R2dbcReadJournalProvider.scala | 8 +-
.../r2dbc/query/scaladsl/QueryDao.scala | 24 ++--
.../persistence/r2dbc/snapshot/SnapshotDao.scala | 8 +-
.../r2dbc/state/scaladsl/DurableStateDao.scala | 14 +--
.../r2dbc/journal/PersistTagsSpec.scala | 2 +-
.../r2dbc/journal/PersistTimestampSpec.scala | 4 +-
.../query/CurrentPersistenceIdsQuerySpec.scala | 2 +-
.../r2dbc/query/EventsByPersistenceIdSpec.scala | 2 +-
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 10 +-
.../r2dbc/query/EventsBySliceSpec.scala | 2 +-
.../state/CurrentPersistenceIdsQuerySpec.scala | 2 +-
.../r2dbc/state/DurableStateBySliceSpec.scala | 4 +-
.../projection/R2dbcProjectionDocExample.scala | 2 +-
project/Dependencies.scala | 1 +
.../projection/r2dbc/R2dbcOffsetStoreSpec.scala | 2 +-
.../projection/r2dbc/R2dbcProjectionSpec.scala | 124 ++++++++++-----------
.../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala | 66 +++++------
20 files changed, 213 insertions(+), 141 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 5ee9af3..23e9ec9 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -9,7 +9,7 @@ on:
jobs:
compile:
- name: Test and compile
+ name: Test and compile (scala 2.12/2.13)
runs-on: ubuntu-latest
strategy:
matrix:
@@ -36,6 +36,30 @@ jobs:
- name: Compile and test for JDK ${{ matrix.JAVA_VERSION }}, Scala ${{
matrix.SCALA_VERSION }}
run: sbt ++${{ matrix.SCALA_VERSION }} test:compile
+ compile3:
+ name: Test and compile (scala 3)
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+
+ - name: Setup Java 8
+ uses: actions/setup-java@v3
+ with:
+ distribution: temurin
+ java-version: 8
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@v6
+
+ - name: Enable jvm-opts
+ run: cp .jvmopts-ci .jvmopts
+
+ - name: Compile
+ run: sbt ++3.3 core/Test/compile
+
test-postgres:
name: Run test with Postgres
runs-on: ubuntu-latest
@@ -78,6 +102,44 @@ jobs:
- name: test
run: sbt ++${{ matrix.SCALA_VERSION }} test
+ test-postgres-scala3:
+ name: Run test with Postgres (Scala 3)
+ runs-on: ubuntu-latest
+ if: github.repository == 'apache/incubator-pekko-persistence-r2dbc'
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+
+ - name: Checkout GitHub merge
+ if: github.event.pull_request
+ run: |-
+ git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
+ git checkout scratch
+
+ - name: Setup Java 8
+ uses: actions/setup-java@v3
+ with:
+ distribution: temurin
+ java-version: 8
+
+ - name: Cache Coursier cache
+ uses: coursier/[email protected]
+
+ - name: Enable jvm-opts
+ run: cp .jvmopts-ci .jvmopts
+
+ - name: Start DB
+ run: |-
+ docker-compose -f docker/docker-compose-postgres.yml up -d
+ # TODO: could we poll the port instead of sleep?
+ sleep 10
+ docker exec -i docker_postgres-db_1 psql -U postgres -t <
ddl-scripts/create_tables_postgres.sql
+
+ - name: test
+ run: sbt ++3.3 core/test
+
test-yugabyte:
name: Run tests with Yugabyte
runs-on: ubuntu-latest
diff --git a/build.sbt b/build.sbt
index 4d56056..0fededa 100644
--- a/build.sbt
+++ b/build.sbt
@@ -62,12 +62,17 @@ def suffixFileFilter(suffix: String): FileFilter = new
SimpleFileFilter(f => f.g
lazy val core = (project in file("core"))
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(name := "pekko-persistence-r2dbc", libraryDependencies ++=
Dependencies.core)
+ .settings(
+ name := "pekko-persistence-r2dbc",
+ crossScalaVersions += Dependencies.Scala3,
+ libraryDependencies ++= Dependencies.core)
lazy val projection = (project in file("projection"))
.dependsOn(core)
.enablePlugins(ReproducibleBuildsPlugin)
- .settings(name := "pekko-projection-r2dbc", libraryDependencies ++=
Dependencies.projection)
+ .settings(
+ name := "pekko-projection-r2dbc",
+ libraryDependencies ++= Dependencies.projection)
lazy val migration = (project in file("migration"))
.enablePlugins(ReproducibleBuildsPlugin)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 64544aa..8ca0198 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -65,7 +65,7 @@ private[r2dbc] object JournalDao {
case metaPayload =>
Some(
SerializedEventMetadata(
- serId = row.get("meta_ser_id", classOf[Integer]),
+ serId = row.get[Integer]("meta_ser_id", classOf[Integer]),
serManifest = row.get("meta_ser_manifest", classOf[String]),
metaPayload))
}
@@ -243,7 +243,7 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
.bind(0, persistenceId)
.bind(1, fromSequenceNr),
row => {
- val seqNr = row.get(0, classOf[java.lang.Long])
+ val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
if (seqNr eq null) 0L else seqNr.longValue
})
.map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
index 1da1a07..d2d30dd 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/R2dbcReadJournalProvider.scala
@@ -20,8 +20,12 @@ import com.typesafe.config.Config
final class R2dbcReadJournalProvider(system: ExtendedActorSystem, config:
Config, cfgPath: String)
extends ReadJournalProvider {
- override val scaladslReadJournal: scaladsl.R2dbcReadJournal =
+ private val readJournalScala: scaladsl.R2dbcReadJournal =
new scaladsl.R2dbcReadJournal(system, config, cfgPath)
- override val javadslReadJournal: javadsl.R2dbcReadJournal = new
javadsl.R2dbcReadJournal(scaladslReadJournal)
+ private val readJournalJava: javadsl.R2dbcReadJournal = new
javadsl.R2dbcReadJournal(readJournalScala)
+
+ override def scaladslReadJournal(): scaladsl.R2dbcReadJournal =
readJournalScala
+
+ override def javadslReadJournal(): javadsl.R2dbcReadJournal = readJournalJava
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index d1e1544..0fff879 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -179,10 +179,10 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
row =>
if (backtracking)
SerializedJournalRow(
- slice = row.get("slice", classOf[Integer]),
+ slice = row.get[Integer]("slice", classOf[Integer]),
entityType,
persistenceId = row.get("persistence_id", classOf[String]),
- seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+ seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = None, // lazy loaded for backtracking
@@ -193,14 +193,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
metadata = None)
else
SerializedJournalRow(
- slice = row.get("slice", classOf[Integer]),
+ slice = row.get[Integer]("slice", classOf[Integer]),
entityType,
persistenceId = row.get("persistence_id", classOf[String]),
- seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+ seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = Some(row.get("event_payload", classOf[Array[Byte]])),
- serId = row.get("event_ser_id", classOf[Integer]),
+ serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
tags = Set.empty, // tags not fetched in queries (yet)
@@ -241,8 +241,8 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
.bind(2, toTimestamp)
.bind(3, limit),
row => {
- val bucketStartEpochSeconds = row.get("bucket",
classOf[java.lang.Long]).toLong * 10
- val count = row.get("count", classOf[java.lang.Long]).toLong
+ val bucketStartEpochSeconds = row.get[java.lang.Long]("bucket",
classOf[java.lang.Long]) * 10
+ val count = row.get[java.lang.Long]("count", classOf[java.lang.Long])
Bucket(bucketStartEpochSeconds, count)
})
@@ -279,14 +279,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
.bind(1, seqNr),
row =>
SerializedJournalRow(
- slice = row.get("slice", classOf[Integer]),
+ slice = row.get[Integer]("slice", classOf[Integer]),
entityType = row.get("entity_type", classOf[String]),
persistenceId,
seqNr,
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = Some(row.get("event_payload", classOf[Array[Byte]])),
- serId = row.get("event_ser_id", classOf[Integer]),
+ serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = "", // not need in this query
tags = Set.empty, // tags not fetched in queries (yet)
@@ -307,14 +307,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
.bind(3, settings.querySettings.bufferSize),
row =>
SerializedJournalRow(
- slice = row.get("slice", classOf[Integer]),
+ slice = row.get[Integer]("slice", classOf[Integer]),
entityType = row.get("entity_type", classOf[String]),
persistenceId = row.get("persistence_id", classOf[String]),
- seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+ seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = Some(row.get("event_payload", classOf[Array[Byte]])),
- serId = row.get("event_ser_id", classOf[Integer]),
+ serId = row.get[Integer]("event_ser_id", classOf[Integer]),
serManifest = row.get("event_ser_manifest", classOf[String]),
writerUuid = row.get("writer", classOf[String]),
tags = Set.empty, // tags not fetched in queries (yet)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 55fcd26..16968b3 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -51,12 +51,12 @@ private[r2dbc] object SnapshotDao {
private def collectSerializedSnapshot(row: Row): SerializedSnapshotRow =
SerializedSnapshotRow(
row.get("persistence_id", classOf[String]),
- row.get("seq_nr", classOf[java.lang.Long]),
- row.get("write_timestamp", classOf[java.lang.Long]),
+ row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+ row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]),
row.get("snapshot", classOf[Array[Byte]]),
- row.get("ser_id", classOf[Integer]),
+ row.get[Integer]("ser_id", classOf[Integer]),
row.get("ser_manifest", classOf[String]), {
- val metaSerializerId = row.get("meta_ser_id", classOf[Integer])
+ val metaSerializerId = row.get[Integer]("meta_ser_id",
classOf[Integer])
if (metaSerializerId eq null) None
else
Some(
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index ce080ae..041369b 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -178,11 +178,11 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
row =>
SerializedStateRow(
persistenceId = persistenceId,
- revision = row.get("revision", classOf[java.lang.Long]),
+ revision = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = Instant.EPOCH, // not needed here
payload = row.get("state_payload", classOf[Array[Byte]]),
- serId = row.get("state_ser_id", classOf[Integer]),
+ serId = row.get[Integer]("state_ser_id", classOf[Integer]),
serManifest = row.get("state_ser_manifest", classOf[String]),
tags = Set.empty // tags not fetched in queries (yet)
))
@@ -325,7 +325,7 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
if (backtracking)
SerializedStateRow(
persistenceId = row.get("persistence_id", classOf[String]),
- revision = row.get("revision", classOf[java.lang.Long]),
+ revision = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = null, // lazy loaded for backtracking
@@ -336,11 +336,11 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
else
SerializedStateRow(
persistenceId = row.get("persistence_id", classOf[String]),
- revision = row.get("revision", classOf[java.lang.Long]),
+ revision = row.get[java.lang.Long]("revision",
classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
payload = row.get("state_payload", classOf[Array[Byte]]),
- serId = row.get("state_ser_id", classOf[Integer]),
+ serId = row.get[Integer]("state_ser_id", classOf[Integer]),
serManifest = row.get("state_ser_manifest", classOf[String]),
tags = Set.empty // tags not fetched in queries (yet)
))
@@ -409,8 +409,8 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
.bind(2, toTimestamp)
.bind(3, limit),
row => {
- val bucketStartEpochSeconds = row.get("bucket",
classOf[java.lang.Long]).toLong * 10
- val count = row.get("count", classOf[java.lang.Long]).toLong
+ val bucketStartEpochSeconds = row.get[java.lang.Long]("bucket",
classOf[java.lang.Long]) * 10
+ val count = row.get[java.lang.Long]("count", classOf[java.lang.Long])
Bucket(bucketStartEpochSeconds, count)
})
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index 4f97a79..7a490ed 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -73,7 +73,7 @@ class PersistTagsSpec
}
Row(
pid = row.get("persistence_id", classOf[String]),
- seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+ seqNr = row.get[java.lang.Long]("seq_nr",
classOf[java.lang.Long]),
tags)
})
.futureValue
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
index ca4eb7e..b021abb 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
@@ -81,13 +81,13 @@ class PersistTimestampSpec
val event = serialization
.deserialize(
row.get("event_payload", classOf[Array[Byte]]),
- row.get("event_ser_id", classOf[Integer]),
+ row.get[Integer]("event_ser_id", classOf[Integer]),
row.get("event_ser_manifest", classOf[String]))
.get
.asInstanceOf[String]
Row(
pid = row.get("persistence_id", classOf[String]),
- seqNr = row.get("seq_nr", classOf[java.lang.Long]),
+ seqNr = row.get[java.lang.Long]("seq_nr",
classOf[java.lang.Long]),
dbTimestamp = row.get("db_timestamp", classOf[Instant]),
event)
})
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
index 76d7421..67c7ec8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/CurrentPersistenceIdsQuerySpec.scala
@@ -57,7 +57,7 @@ class CurrentPersistenceIdsQuerySpec
override protected def beforeAll(): Unit = {
super.beforeAll()
- val probe = createTestProbe[Done]
+ val probe = createTestProbe[Done]()
pids.foreach { pid =>
val persister = spawn(TestActors.Persister(pid))
persister ! Persister.PersistWithAck("e-1", probe.ref)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
index d6cee73..d08d2da 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
@@ -195,7 +195,7 @@ class EventsByPersistenceIdSpec
"Live query" should {
"pick up new events" in {
- val pid = nextPid
+ val pid = nextPid()
val persister = testKit.spawn(Persister(pid))
val probe = testKit.createTestProbe[Done]()
val sub = query
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index 2c1ca6c..edb726a 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -67,11 +67,11 @@ class EventsBySlicePubSubSpec
private val query =
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
private class Setup {
- val entityType = nextEntityType()
- val persistenceId = nextPid(entityType)
+ val setupEntityType = nextEntityType()
+ val persistenceId = nextPid(setupEntityType)
val slice = query.sliceForPersistenceId(persistenceId)
val persister = spawn(TestActors.Persister(persistenceId))
- val probe = createTestProbe[Done]
+ val probe = createTestProbe[Done]()
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)
}
@@ -101,11 +101,11 @@ class EventsBySlicePubSubSpec
"publish new events" in new Setup {
val result: TestSubscriber.Probe[EventEnvelope[String]] =
- query.eventsBySlices[String](entityType, slice, slice,
NoOffset).runWith(sinkProbe).request(10)
+ query.eventsBySlices[String](setupEntityType, slice, slice,
NoOffset).runWith(sinkProbe).request(10)
val topicStatsProbe = createTestProbe[TopicImpl.TopicStats]()
eventually {
- PubSub(typedSystem).eventTopic[String](entityType, slice) !
TopicImpl.GetTopicStats(topicStatsProbe.ref)
+ PubSub(typedSystem).eventTopic[String](setupEntityType, slice) !
TopicImpl.GetTopicStats(topicStatsProbe.ref)
topicStatsProbe.receiveMessage().localSubscriberCount shouldBe 1
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
index 4a84698..ba2fdd8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
@@ -92,7 +92,7 @@ class EventsBySliceSpec
val persistenceId = nextPid(entityType)
val slice = query.sliceForPersistenceId(persistenceId)
val persister = spawn(TestActors.Persister(persistenceId))
- val probe = createTestProbe[Done]
+ val probe = createTestProbe[Done]()
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
index 6d0be4c..d0df18b 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala
@@ -58,7 +58,7 @@ class CurrentPersistenceIdsQuerySpec
override protected def beforeAll(): Unit = {
super.beforeAll()
- val probe = createTestProbe[Done]
+ val probe = createTestProbe[Done]()
pids.foreach { pid =>
val persister = spawn(TestActors.DurableStatePersister(pid))
persister ! DurableStatePersister.PersistWithAck("s-1", probe.ref)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
index 4e556cb..a344643 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
@@ -77,11 +77,11 @@ class DurableStateBySliceSpec
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
private class Setup {
- val entityType = nextEntityType
+ val entityType = nextEntityType()
val persistenceId = nextPid(entityType)
val slice = query.sliceForPersistenceId(persistenceId)
val persister = spawn(TestActors.DurableStatePersister(persistenceId))
- val probe = createTestProbe[Done]
+ val probe = createTestProbe[Done]()
val updatedDurableStateProbe =
createTestProbe[UpdatedDurableState[String]]()
val killSwitch = KillSwitches.shared("test")
}
diff --git
a/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
b/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
index dce41b2..c8150e4 100644
--- a/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
+++ b/docs/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
@@ -109,7 +109,7 @@ object R2dbcProjectionDocExample {
}
// #grouped-handler
- implicit val system = ActorSystem[Nothing](Behaviors.empty, "Example")
+ implicit val system: ActorSystem[Nothing] =
ActorSystem[Nothing](Behaviors.empty, "Example")
implicit val ec: ExecutionContext = system.executionContext
object IllustrateInit {
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 687a2d8..5745b51 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -16,6 +16,7 @@ import sbt._
object Dependencies {
val Scala212 = "2.12.18"
val Scala213 = "2.13.11"
+ val Scala3 = "3.3.0"
val PekkoVersion = System.getProperty("override.pekko.version", "1.0.1")
val PekkoVersionInDocs = "current"
val PekkoPersistenceJdbcVersion = "0.0.0+998-6a9e5841-SNAPSHOT"
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
index 76e333b..4bf106d 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala
@@ -63,7 +63,7 @@ class R2dbcOffsetStoreSpec
.createStatement(selectLastSql)
.bind(0, projectionId.name)
.bind(1, projectionId.key),
- row => Instant.ofEpochMilli(row.get("last_updated",
classOf[java.lang.Long])))
+ row => Instant.ofEpochMilli(row.get[java.lang.Long]("last_updated",
classOf[java.lang.Long])))
.futureValue
.getOrElse(throw new RuntimeException(s"no records found for
$projectionId"))
}
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
index 5afe017..0560568 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala
@@ -268,9 +268,9 @@ class R2dbcProjectionSpec
"A R2DBC exactly-once projection" must {
"persist projection and offset in the same write operation
(transactional)" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection.exactlyOnce(
@@ -287,9 +287,9 @@ class R2dbcProjectionSpec
}
"skip failing events when using RecoveryStrategy.skip" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val bogusEventHandler = new ConcatHandler(_ == 4)
@@ -310,9 +310,9 @@ class R2dbcProjectionSpec
}
"store offset for failing events when using RecoveryStrategy.skip" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val bogusEventHandler = new ConcatHandler(_ == 6)
@@ -333,9 +333,9 @@ class R2dbcProjectionSpec
}
"skip failing events after retrying when using
RecoveryStrategy.retryAndSkip" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val bogusEventHandler = new ConcatHandler(_ == 4)
@@ -380,9 +380,9 @@ class R2dbcProjectionSpec
}
"fail after retrying when using RecoveryStrategy.retryAndFail" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val bogusEventHandler = new ConcatHandler(_ == 4)
@@ -407,9 +407,9 @@ class R2dbcProjectionSpec
}
"restart from previous offset - fail with throwing an exception" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
def exactlyOnceProjection(failWhenOffset: Long => Boolean = _ => false)
= {
R2dbcProjection.exactlyOnce(
@@ -435,9 +435,9 @@ class R2dbcProjectionSpec
}
"restart from previous offset - fail with bad insert on user code" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val bogusEventHandler = new R2dbcHandler[Envelope] {
override def process(session: R2dbcSession, envelope: Envelope):
Future[Done] = {
@@ -473,9 +473,9 @@ class R2dbcProjectionSpec
"verify offsets before and after processing an envelope" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
case class ProbeMessage(str: String, offset: Long)
val verificationProbe = createTestProbe[ProbeMessage]("verification")
@@ -515,9 +515,9 @@ class R2dbcProjectionSpec
}
"skip record if offset verification fails before processing envelope" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val testVerification = (offset: Long) => {
if (offset == 3L)
@@ -541,9 +541,9 @@ class R2dbcProjectionSpec
}
"skip record if offset verification fails after processing envelope" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val testVerification = (offset: Long) => {
if (offset == 3L)
@@ -569,9 +569,9 @@ class R2dbcProjectionSpec
"A R2DBC grouped projection" must {
"persist projection and offset in the same write operation
(transactional)" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val handlerCalled = "called"
val handlerProbe = createTestProbe[String]("calls-to-handler")
@@ -612,9 +612,9 @@ class R2dbcProjectionSpec
}
"handle grouped async projection and store offset" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val result = new StringBuffer()
@@ -648,9 +648,9 @@ class R2dbcProjectionSpec
"A R2DBC at-least-once projection" must {
"persist projection and offset" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection.atLeastOnce(
@@ -669,9 +669,9 @@ class R2dbcProjectionSpec
}
"skip failing events when using RecoveryStrategy.skip, save after 1" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection
@@ -693,9 +693,9 @@ class R2dbcProjectionSpec
}
"skip failing events when using RecoveryStrategy.skip, save after 2" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection
@@ -717,9 +717,9 @@ class R2dbcProjectionSpec
}
"restart from previous offset - handler throwing an exception, save after
1" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
def atLeastOnceProjection(failWhenOffset: Long => Boolean = _ => false) =
R2dbcProjection
@@ -762,9 +762,9 @@ class R2dbcProjectionSpec
}
"restart from previous offset - handler throwing an exception, save after
2" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
offsetStore.readOffset[Long]().futureValue shouldBe empty
@@ -813,9 +813,9 @@ class R2dbcProjectionSpec
}
"save offset after number of elements" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
import pekko.actor.typed.scaladsl.adapter._
val sourceProbe = new AtomicReference[TestPublisher.Probe[Envelope]]()
@@ -858,9 +858,9 @@ class R2dbcProjectionSpec
}
"save offset after idle duration" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
import pekko.actor.typed.scaladsl.adapter._
val sourceProbe = new AtomicReference[TestPublisher.Probe[Envelope]]()
@@ -904,9 +904,9 @@ class R2dbcProjectionSpec
}
"verify offsets before processing an envelope" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val verifiedProbe = createTestProbe[Long]()
val testVerification = (offset: Long) => {
@@ -934,9 +934,9 @@ class R2dbcProjectionSpec
}
"skip record if offset verification fails before processing envelope" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val testVerification = (offset: Long) => {
if (offset == 3L)
@@ -961,9 +961,9 @@ class R2dbcProjectionSpec
}
"handle async projection and store offset" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val result = new StringBuffer()
@@ -994,9 +994,9 @@ class R2dbcProjectionSpec
"A R2DBC flow projection" must {
"persist projection and offset" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
offsetShouldBeEmpty()
@@ -1072,9 +1072,9 @@ class R2dbcProjectionSpec
}
"call start and stop of the handler" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val handlerProbe = createTestProbe[String]()
val handler = new LifecycleHandler(handlerProbe.ref)
@@ -1110,9 +1110,9 @@ class R2dbcProjectionSpec
}
"call start and stop of the handler when using TestKit.runWithTestSink" in
{
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val handlerProbe = createTestProbe[String]()
val handler = new LifecycleHandler(handlerProbe.ref)
@@ -1149,9 +1149,9 @@ class R2dbcProjectionSpec
}
"call start and stop of handler when restarted" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val handlerProbe = createTestProbe[String]()
@volatile var _handler: Option[LifecycleHandler] = None
@@ -1220,9 +1220,9 @@ class R2dbcProjectionSpec
}
"call start and stop of handler when failed but no restart" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val handlerProbe = createTestProbe[String]()
val failOnceOnOffset = new AtomicInteger(4)
@@ -1248,9 +1248,9 @@ class R2dbcProjectionSpec
}
"be able to stop when retrying" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val handlerProbe = createTestProbe[String]()
val handler = new LifecycleHandler(handlerProbe.ref, alwaysFailOnOffset
= 4)
@@ -1280,9 +1280,9 @@ class R2dbcProjectionSpec
"R2dbcProjection management" must {
"restart from beginning when offset is cleared" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection
@@ -1312,9 +1312,9 @@ class R2dbcProjectionSpec
}
"restart from updated offset" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection
@@ -1345,9 +1345,9 @@ class R2dbcProjectionSpec
}
"pause projection" in {
- implicit val entityId = UUID.randomUUID().toString
+ implicit val entityId: String = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
- implicit val offsetStore = createOffsetStore(projectionId)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId)
val projection =
R2dbcProjection
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 61007e1..163b5ee 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -376,7 +376,7 @@ class R2dbcTimestampOffsetProjectionSpec
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.exactlyOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -393,7 +393,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val bogusEventHandler = new ConcatHandler(_.sequenceNr == 4)
@@ -414,7 +414,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val bogusEventHandler = new ConcatHandler(_.sequenceNr == 6)
@@ -435,7 +435,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val statusProbe = createTestProbe[TestStatusObserver.Status]()
val progressProbe =
createTestProbe[TestStatusObserver.OffsetProgress[Envelope]]()
@@ -478,7 +478,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val bogusEventHandler = new ConcatHandler(_.sequenceNr == 4)
@@ -504,7 +504,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
def exactlyOnceProjection(failWhen: EventEnvelope[String] => Boolean = _
=> false) = {
R2dbcProjection.exactlyOnce(
@@ -535,7 +535,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.exactlyOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -555,7 +555,7 @@ class R2dbcTimestampOffsetProjectionSpec
val startTime = Instant.now()
val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1,
pid2)
val sourceProvider1 = createSourceProvider(envelopes1)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider1)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider1)
val projection1 =
R2dbcProjection.exactlyOnce(projectionId, Some(settings),
sourceProvider1, handler = () => new ConcatHandler)
@@ -591,7 +591,7 @@ class R2dbcTimestampOffsetProjectionSpec
env
}
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.exactlyOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -611,7 +611,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val handlerProbe = createTestProbe[String]("calls-to-handler")
@@ -638,7 +638,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val handlerProbe = createTestProbe[String]("calls-to-handler")
@@ -667,7 +667,7 @@ class R2dbcTimestampOffsetProjectionSpec
val startTime = Instant.now()
val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1,
pid2)
val sourceProvider1 = createBacktrackingSourceProvider(envelopes1)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider1)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider1)
val handlerProbe = createTestProbe[String]()
@@ -716,7 +716,7 @@ class R2dbcTimestampOffsetProjectionSpec
env
}
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val handlerProbe = createTestProbe[String]("calls-to-handler")
@@ -742,7 +742,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val result = new StringBuffer()
@@ -776,7 +776,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val result1 = new StringBuffer()
val result2 = new StringBuffer()
@@ -815,7 +815,7 @@ class R2dbcTimestampOffsetProjectionSpec
val startTime = Instant.now()
val sourceProvider = new TestSourceProviderWithInput()
- implicit val offsetStore =
+ implicit val offsetStore: R2dbcOffsetStore =
new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
val result1 = new StringBuffer()
@@ -875,7 +875,7 @@ class R2dbcTimestampOffsetProjectionSpec
env
}
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val result = new StringBuffer()
@@ -911,7 +911,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.atLeastOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -931,7 +931,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.atLeastOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -952,7 +952,7 @@ class R2dbcTimestampOffsetProjectionSpec
val startTime = Instant.now()
val sourceProvider = new TestSourceProviderWithInput()
- implicit val offsetStore =
+ implicit val offsetStore: R2dbcOffsetStore =
new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
val projectionRef = spawn(
@@ -989,7 +989,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val failOnce = new AtomicBoolean(true)
val failPredicate = (ev: EventEnvelope[String]) => {
@@ -1026,7 +1026,7 @@ class R2dbcTimestampOffsetProjectionSpec
env
}
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.atLeastOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -1045,7 +1045,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val result = new StringBuffer()
@@ -1075,7 +1075,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val failOnce = new AtomicBoolean(true)
val failPredicate = (ev: EventEnvelope[String]) => {
@@ -1120,7 +1120,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val result1 = new StringBuffer()
val result2 = new StringBuffer()
@@ -1156,7 +1156,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val startTime = Instant.now()
val sourceProvider = new TestSourceProviderWithInput()
- implicit val offsetStore =
+ implicit val offsetStore: R2dbcOffsetStore =
new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
val result1 = new StringBuffer()
@@ -1216,7 +1216,7 @@ class R2dbcTimestampOffsetProjectionSpec
}
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection.atLeastOnce(projectionId, Some(settings),
sourceProvider, handler = () => new ConcatHandler)
@@ -1238,7 +1238,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
offsetShouldBeEmpty()
@@ -1267,7 +1267,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopesWithDuplicates(pid1, pid2)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
info(s"pid1 [$pid1], pid2 [$pid2]")
@@ -1298,7 +1298,7 @@ class R2dbcTimestampOffsetProjectionSpec
val startTime = Instant.now()
val sourceProvider = new TestSourceProviderWithInput()
- implicit val offsetStore =
+ implicit val offsetStore: R2dbcOffsetStore =
new R2dbcOffsetStore(projectionId, Some(sourceProvider), system,
settings, r2dbcExecutor)
val flowHandler =
@@ -1351,7 +1351,7 @@ class R2dbcTimestampOffsetProjectionSpec
env
}
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
offsetShouldBeEmpty()
@@ -1383,7 +1383,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection
@@ -1417,7 +1417,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6)
val sourceProvider = createSourceProvider(envelopes)
- implicit val offsetStore = createOffsetStore(projectionId,
sourceProvider)
+ implicit val offsetStore: R2dbcOffsetStore =
createOffsetStore(projectionId, sourceProvider)
val projection =
R2dbcProjection
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]