This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6a91c5e Port remaining changes for #357 and
akka/akka-persistence-r2dbc#348 (#375)
6a91c5e is described below
commit 6a91c5ec7b5624475c9080e297140b16f9d07fcb
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 10:15:52 2026 +0100
Port remaining changes for #357 and akka/akka-persistence-r2dbc#348 (#375)
* Port changes from akka-persistence-r2dbc PR #348 (filtered event fix)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/5e0c4694-7a40-473f-af9b-38a34621e7fc
Co-authored-by: pjfanning <[email protected]>
* scalafmt
* move file
* Update envelope-origin.excludes
* Update EventsBySlicePubSubSpec.scala
* Remove skipPubSubTooFarAhead - was from newer Akka version not in ported
PRs
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/470ac23a-d70a-468f-a7ff-eede5c2667fa
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../r2dbc/internal/EnvelopeOrigin.scala | 42 ++++++++
.../pekko/persistence/r2dbc/internal/PubSub.scala | 4 +-
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 13 ++-
.../query/EventsBySliceBacktrackingSpec.scala | 4 +
.../r2dbc/query/EventsBySlicePerfSpec.scala | 117 +++++++++++++++++++--
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 56 ++++++----
.../envelope-origin.excludes | 25 +++++
.../r2dbc/internal/R2dbcOffsetStore.scala | 32 +++---
.../r2dbc/internal/R2dbcProjectionImpl.scala | 29 ++---
.../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala | 27 +++--
.../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 53 +++++++++-
11 files changed, 319 insertions(+), 83 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EnvelopeOrigin.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EnvelopeOrigin.scala
new file mode 100644
index 0000000..7a1af02
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EnvelopeOrigin.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import org.apache.pekko
+import pekko.annotation.InternalStableApi
+import pekko.persistence.query.typed.EventEnvelope
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi private[pekko] object EnvelopeOrigin {
+ val SourceQuery = ""
+ val SourceBacktracking = "BT"
+ val SourcePubSub = "PS"
+
+ def fromQuery(env: EventEnvelope[_]): Boolean =
+ env.source == SourceQuery
+
+ def fromBacktracking(env: EventEnvelope[_]): Boolean =
+ env.source == SourceBacktracking
+
+ def fromPubSub(env: EventEnvelope[_]): Boolean =
+ env.source == SourcePubSub
+
+ def isFilteredEvent(env: Any): Boolean =
+ env match {
+ case e: EventEnvelope[_] => e.filtered
+ case _ => false
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index f8d9562..6d901d2 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -125,7 +125,9 @@ import org.slf4j.LoggerFactory
timestamp.toEpochMilli,
pr.metadata,
entityType,
- slice)
+ slice,
+ filtered = false,
+ source = EnvelopeOrigin.SourcePubSub)
eventTopic(entityType, slice) ! Topic.Publish(envelope)
}
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 5be7856..3ead172 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -40,6 +40,7 @@ import pekko.persistence.query.{ EventEnvelope =>
ClassicEventEnvelope }
import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.ContinuousQuery
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.typed.PersistenceId
@@ -84,6 +85,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
val createEnvelope: (TimestampOffset, SerializedJournalRow) =>
EventEnvelope[Any] = (offset, row) => {
val event = row.payload.map(payload =>
serialization.deserialize(payload, row.serId, row.serManifest).get)
val metadata = row.metadata.map(meta =>
serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
+ val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else
EnvelopeOrigin.SourceBacktracking
new EventEnvelope(
offset,
row.persistenceId,
@@ -92,7 +94,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
row.dbTimestamp.toEpochMilli,
metadata,
row.entityType,
- row.slice)
+ row.slice,
+ filtered = false,
+ source)
}
val extractOffset: EventEnvelope[Any] => TimestampOffset = env =>
env.offset.asInstanceOf[TimestampOffset]
@@ -196,7 +200,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
// cache of seen pid/seqNr
var seen = mutable.LinkedHashSet.empty[(String, Long)]
env => {
- if (env.eventOption.isEmpty) {
+ if (EnvelopeOrigin.fromBacktracking(env)) {
// don't deduplicate from backtracking
env :: Nil
} else {
@@ -322,6 +326,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
row.payload.map(payload => serialization.deserialize(payload, row.serId,
row.serManifest).get.asInstanceOf[Event])
val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp,
Map(row.persistenceId -> row.seqNr))
val metadata = row.metadata.map(meta =>
serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
+ val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else
EnvelopeOrigin.SourceBacktracking
new EventEnvelope(
offset,
row.persistenceId,
@@ -330,7 +335,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
row.dbTimestamp.toEpochMilli,
metadata,
row.entityType,
- row.slice)
+ row.slice,
+ filtered = false,
+ source)
}
private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope
= {
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index b0dd655..061b6f9 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -29,6 +29,7 @@ import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.internal.InstantFactory
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
@@ -114,17 +115,20 @@ class EventsBySliceBacktrackingSpec
env1.persistenceId shouldBe pid1
env1.sequenceNr shouldBe 1L
env1.eventOption shouldBe Some("e1-1")
+ env1.source shouldBe EnvelopeOrigin.SourceQuery
val env2 = result.expectNext()
env2.persistenceId shouldBe pid1
env2.sequenceNr shouldBe 2L
env2.eventOption shouldBe Some("e1-2")
+ env2.source shouldBe EnvelopeOrigin.SourceQuery
// first backtracking query kicks in immediately after the first normal
query has finished
// and it also emits duplicates (by design)
val env3 = result.expectNext()
env3.persistenceId shouldBe pid1
env3.sequenceNr shouldBe 1L
+ env3.source shouldBe EnvelopeOrigin.SourceBacktracking
// event payload isn't included in backtracking results
env3.eventOption shouldBe None
// but it can be lazy loaded
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
index 5b12b34..51599dc 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
@@ -27,24 +27,48 @@ import pekko.persistence.query.NoOffset
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.r2dbc.TestActors
import pekko.persistence.r2dbc.TestActors.Persister.Persist
+import pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.QuerySettings
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
+object EventsBySlicePerfSpec {
+ private val config = ConfigFactory
+ .parseString("""
+ pekko.persistence.r2dbc.journal.publish-events = on
+ pekko.persistence.r2dbc.query {
+ backtracking.enabled = on
+ refresh-interval = 3s
+ #buffer-size = 100
+ }
+ """)
+ .withFallback(TestConfig.config)
+
+ private final case class PidSeqNr(pid: String, seqNr: Long)
+}
+
class EventsBySlicePerfSpec
- extends
ScalaTestWithActorTestKit(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.config))
+ extends ScalaTestWithActorTestKit(EventsBySlicePerfSpec.config)
with AnyWordSpecLike
with TestDbLifecycle
- with TestData
- with LogCapturing {
+ with LogCapturing
+ with TestData {
+ import EventsBySlicePerfSpec.PidSeqNr
override def typedSystem: ActorSystem[_] = system
private val query =
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
+ private lazy val r2dbcQuerySettings =
+
QuerySettings(testKit.system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
+
"EventsBySlices performance" should {
"retrieve from several slices" in {
@@ -94,22 +118,97 @@ class EventsBySlicePerfSpec
val counts: Seq[Future[Int]] = ranges.map { range =>
query
.currentEventsBySlices[String](entityType, range.min, range.max,
NoOffset)
- .runWith(Sink.fold(0) { case (acc, _) =>
- if (acc > 0 && acc % 100 == 0)
- println(s"#$iteration Reading [$acc] events from slices
[${range.min}-${range.max}] " +
- s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
- acc + 1
+ .runWith(Sink.fold(0) { case (acc, env) =>
+ if (EnvelopeOrigin.fromQuery(env)) {
+ if (acc > 0 && acc % 100 == 0)
+ println(s"#$iteration Reading [$acc] events from slices
[${range.min}-${range.max}] " +
+ s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
+ acc + 1
+ } else {
+ acc
+ }
})
}
implicit val ec: ExecutionContext = testKit.system.executionContext
val total = Await.result(Future.sequence(counts).map(_.sum),
30.seconds)
total shouldBe totalNumberOfEvents
println(
- s"#$iteration Reading all [$totalNumberOfEvents] events from
[${ranges.size}] eventsBySlices " +
+ s"#$iteration Reading all [$totalNumberOfEvents] events from
[${ranges.size}] slices with currentEventsBySlices " +
s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
}
}
+ "write and read concurrently" in {
+ // increase these properties for "real" testing
+ // also, remove LogCapturing and change logback log levels for "real"
testing
+ val numberOfEventsPerWriter = 20
+ val writeConcurrency = 10
+ val writeRps = 300
+ val iterations = 2
+ val totalNumberOfEvents = writeConcurrency * numberOfEventsPerWriter
+ val verbosePrintLag = false
+
+ implicit val ec: ExecutionContext = testKit.system.executionContext
+
+ val entityType = nextEntityType()
+ val persistenceIds = (1 to writeConcurrency).map(_ =>
nextPid(entityType)).toVector
+
+ (1 to iterations).foreach { iteration =>
+ val t0 = System.nanoTime()
+ val writeProbe = createTestProbe[Done]()
+ val persisters = persistenceIds.map(pid =>
testKit.spawn(TestActors.Persister(pid)))
+ Source(1 to numberOfEventsPerWriter)
+ .mapConcat(n => persisters.map(ref => ref -> n))
+ .throttle(writeRps / 10, 100.millis)
+ .map { case (ref, n) =>
+ ref ! Persist(s"e-$n")
+ }
+ .runWith(Sink.ignore)
+ .foreach { _ =>
+ // stop them at the end
+ persisters.foreach(_ ! TestActors.Persister.Stop(writeProbe.ref))
+ }
+
+ val done: Future[Done] =
+ query
+ .eventsBySlices[String](entityType, 0,
persistenceExt.numberOfSlices - 1, NoOffset)
+ .scan(Set.empty[PidSeqNr]) { case (acc, env) =>
+ val newAcc = acc + PidSeqNr(env.persistenceId, env.sequenceNr)
+
+ if (verbosePrintLag) {
+ val duplicate = if (newAcc.size == acc.size) " (duplicate)"
else ""
+ val lagMillis = System.currentTimeMillis() - env.timestamp
+ val delayed =
+ (EnvelopeOrigin.fromPubSub(env) && lagMillis > 50) ||
+ (EnvelopeOrigin.fromQuery(env) && lagMillis >
r2dbcQuerySettings.refreshInterval.toMillis + 300) ||
+ (EnvelopeOrigin.fromPubSub(
+ env) && lagMillis >
r2dbcQuerySettings.backtrackingWindow.toMillis / 2 + 300)
+ if (delayed)
+ println(
+ s"# received ${newAcc.size}$duplicate from ${env.source}:
${env.persistenceId} seqNr ${env.sequenceNr}, lag $lagMillis ms")
+ }
+
+ if (newAcc.size != acc.size && (newAcc.size % 100 == 0))
+ println(s"#$iteration Reading [${newAcc.size}] events " +
+ s"took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
+ newAcc
+
+ }
+ .takeWhile(_.size < totalNumberOfEvents)
+ .runWith(Sink.ignore)
+
+ writeProbe.receiveMessages(persisters.size, (totalNumberOfEvents /
writeRps).seconds + 10.seconds)
+ println(
+ s"#$iteration Persisting all [$totalNumberOfEvents] events from
[${persistenceIds.size}] persistent " +
+ s"actors took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
+
+ Await.result(done, 30.seconds)
+ println(
+ s"#$iteration Reading all [$totalNumberOfEvents] events with
eventsBySlices " +
+ s"took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
+ }
+ }
+
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index edbb96f..660bdda 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -36,10 +36,11 @@ import
pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
-import pekko.persistence.r2dbc.internal.InstantFactory
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
+import pekko.stream.scaladsl.Keep
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.testkit.TestSubscriber
@@ -90,26 +91,45 @@ class EventsBySlicePubSubSpec
val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
}
- private def createEnvelope(pid: PersistenceId, seqNr: Long, evt: String):
EventEnvelope[String] = {
- val now = InstantFactory.now()
+ private def createEnvelope(
+ pid: PersistenceId,
+ seqNr: Long,
+ evt: String,
+ time: Instant = Instant.now()): EventEnvelope[String] = {
EventEnvelope(
- TimestampOffset(now, Map(pid.id -> seqNr)),
+ TimestampOffset(time, time, Map(pid.id -> seqNr)),
pid.id,
seqNr,
evt,
- now.toEpochMilli,
+ time.toEpochMilli,
pid.entityTypeHint,
- query.sliceForPersistenceId(pid.id))
+ query.sliceForPersistenceId(pid.id),
+ filtered = false,
+ source = EnvelopeOrigin.SourcePubSub)
}
+ def backtrackingEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
+ new EventEnvelope[String](
+ env.offset,
+ env.persistenceId,
+ env.sequenceNr,
+ eventOption = None,
+ env.timestamp,
+ env.eventMetadata,
+ env.entityType,
+ env.slice,
+ filtered = false,
+ source = EnvelopeOrigin.SourceBacktracking)
+
private val entityType = nextEntityType()
private val pidA = PersistenceId(entityType, "A")
private val pidB = PersistenceId(entityType, "B")
- private val envA1 = createEnvelope(pidA, 1L, "a1")
- private val envA2 = createEnvelope(pidA, 2L, "a2")
- private val envA3 = createEnvelope(pidA, 3L, "a3")
- private val envB1 = createEnvelope(pidB, 1L, "b1")
- private val envB2 = createEnvelope(pidB, 2L, "b2")
+ val now = Instant.now()
+ private val envA1 = createEnvelope(pidA, 1L, "a1", now)
+ private val envA2 = createEnvelope(pidA, 2L, "a2", now.plusMillis(1))
+ private val envA3 = createEnvelope(pidA, 3L, "a3", now.plusMillis(2))
+ private val envB1 = createEnvelope(pidB, 1L, "b1", now.plusMillis(3))
+ private val envB2 = createEnvelope(pidB, 2L, "b2", now.plusMillis(4))
"EventsBySlices pub-sub" should {
@@ -133,7 +153,9 @@ class EventsBySlicePubSubSpec
// 10 was requested
for (i <- 1 to 10) {
- result.expectNext().event shouldBe s"e-$i"
+ val env = result.expectNext()
+ env.event shouldBe s"e-$i"
+ env.source shouldBe EnvelopeOrigin.SourcePubSub
}
result.expectNoMessage()
@@ -166,15 +188,7 @@ class EventsBySlicePubSubSpec
}
"not deduplicate from backtracking" in {
- val envA2back = new EventEnvelope[String](
- envA2.offset,
- envA2.persistenceId,
- envA2.sequenceNr,
- eventOption = None,
- envA2.timestamp,
- envA2.eventMetadata,
- envA2.entityType,
- envA2.slice)
+ val envA2back = backtrackingEnvelope(envA2)
val out = Source(List(envA1, envA2, envB1, envA2back, envB2))
.via(query.deduplicate(capacity = 10))
.runWith(Sink.seq)
diff --git
a/projection/src/main/mima-filters/2.0.x.backwards.excludes/envelope-origin.excludes
b/projection/src/main/mima-filters/2.0.x.backwards.excludes/envelope-origin.excludes
new file mode 100644
index 0000000..b8338e1
--- /dev/null
+++
b/projection/src/main/mima-filters/2.0.x.backwards.excludes/envelope-origin.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by porting https://github.com/akka/akka-persistence-r2dbc/pull/348
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.unapply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.copy*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.envelopeLoaded")
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index c85c50f..01d26c8 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -38,6 +38,7 @@ import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.typed.PersistenceId
@@ -63,7 +64,8 @@ object R2dbcOffsetStore {
record: Record,
offset: TimestampOffset,
strictSeqNr: Boolean,
- envelopeLoaded: Boolean)
+ fromBacktracking: Boolean,
+ fromPubSub: Boolean)
object State {
val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH)
@@ -658,14 +660,14 @@ private[projection] class R2dbcOffsetStore(
val prevSeqNr = currentInflight.getOrElse(pid,
currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
def logUnexpected(): Unit = {
- if (viaPubSub(recordWithOffset.offset))
+ if (recordWithOffset.fromPubSub)
logger.debug(
"Rejecting pub-sub envelope, unexpected sequence number [{}] for
pid [{}], previous sequence number [{}]. Offset: {}",
seqNr: java.lang.Long,
pid,
prevSeqNr: java.lang.Long,
recordWithOffset.offset)
- else if (recordWithOffset.envelopeLoaded)
+ else if (!recordWithOffset.fromBacktracking)
logger.debug(
"Rejecting unexpected sequence number [{}] for pid [{}], previous
sequence number [{}]. Offset: {}",
seqNr: java.lang.Long,
@@ -682,13 +684,13 @@ private[projection] class R2dbcOffsetStore(
}
def logUnknown(): Unit = {
- if (viaPubSub(recordWithOffset.offset)) {
+ if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid
[{}] (might be accepted later): {}",
seqNr: java.lang.Long,
pid,
recordWithOffset.offset)
- } else if (recordWithOffset.envelopeLoaded) {
+ } else if (!recordWithOffset.fromBacktracking) {
// This may happen rather frequently when using `publish-events`,
after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be
accepted later): {}",
@@ -713,7 +715,7 @@ private[projection] class R2dbcOffsetStore(
// currentInFlight contains those that have been processed or about
to be processed in Flow,
// but offset not saved yet => ok to handle as duplicate
FutureFalse
- } else if (recordWithOffset.envelopeLoaded) {
+ } else if (!recordWithOffset.fromBacktracking) {
logUnexpected()
FutureFalse
} else {
@@ -745,7 +747,7 @@ private[projection] class R2dbcOffsetStore(
previousTimestamp,
before)
true
- } else if (recordWithOffset.envelopeLoaded) {
+ } else if (!recordWithOffset.fromBacktracking) {
logUnknown()
false
} else {
@@ -777,13 +779,6 @@ private[projection] class R2dbcOffsetStore(
}
}
- private def viaPubSub(offset: Offset): Boolean = {
- offset match {
- case t: TimestampOffset => t.timestamp == t.readTimestamp
- case _ => false
- }
- }
-
@tailrec final def addInflight[Envelope](envelope: Envelope): Unit = {
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
@@ -1013,7 +1008,8 @@ private[projection] class R2dbcOffsetStore(
Record(eventEnvelope.persistenceId, eventEnvelope.sequenceNr,
timestampOffset.timestamp),
timestampOffset,
strictSeqNr = true,
- envelopeLoaded = eventEnvelope.eventOption.isDefined))
+ fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope),
+ fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope)))
case change: UpdatedDurableState[_] if
change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
Some(
@@ -1021,7 +1017,8 @@ private[projection] class R2dbcOffsetStore(
Record(change.persistenceId, change.revision,
timestampOffset.timestamp),
timestampOffset,
strictSeqNr = false,
- envelopeLoaded = change.value != null))
+ fromBacktracking = change.value == null,
+ fromPubSub = false))
case change: DeletedDurableState[_] if
change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
Some(
@@ -1029,7 +1026,8 @@ private[projection] class R2dbcOffsetStore(
Record(change.persistenceId, change.revision,
timestampOffset.timestamp),
timestampOffset,
strictSeqNr = false,
- envelopeLoaded = true))
+ fromBacktracking = false,
+ fromPubSub = false))
case change: DurableStateChange[_] if
change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
diff --git
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 0ee478d..ce2ff55 100644
---
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -23,7 +23,6 @@ import scala.util.control.NonFatal
import org.apache.pekko
import pekko.Done
-import pekko.NotUsed
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.event.Logging
@@ -32,6 +31,7 @@ import pekko.persistence.query.DeletedDurableState
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.state.scaladsl.DurableStateStore
import pekko.persistence.state.scaladsl.GetObjectResult
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory
*/
@InternalApi
private[projection] object R2dbcProjectionImpl {
+ import EnvelopeOrigin.{ fromBacktracking, isFilteredEvent }
val log: Logger = LoggerFactory.getLogger(classOf[R2dbcProjectionImpl[_, _]])
private val FutureDone: Future[Done] = Future.successful(Done)
@@ -96,7 +97,8 @@ private[projection] object R2dbcProjectionImpl {
def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_,
Envelope])(implicit
ec: ExecutionContext): Future[Envelope] = {
env match {
- case eventEnvelope: EventEnvelope[_] if
eventEnvelope.eventOption.isEmpty && !skipEnvelope(eventEnvelope) =>
+ case eventEnvelope: EventEnvelope[_]
+ if fromBacktracking(eventEnvelope) &&
eventEnvelope.eventOption.isEmpty && !eventEnvelope.filtered =>
val pid = eventEnvelope.persistenceId
val seqNr = eventEnvelope.sequenceNr
(sourceProvider match {
@@ -166,7 +168,7 @@ private[projection] object R2dbcProjectionImpl {
override def process(envelope: Envelope): Future[Done] = {
offsetStore.isAccepted(envelope).flatMap {
case true =>
- if (skipEnvelope(envelope)) {
+ if (isFilteredEvent(envelope)) {
val offset = sourceProvider.extractOffset(envelope)
offsetStore.saveOffset(offset)
} else {
@@ -207,7 +209,7 @@ private[projection] object R2dbcProjectionImpl {
Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env,
sourceProvider))).flatMap {
loadedEnvelopes =>
val offsets =
loadedEnvelopes.iterator.map(sourceProvider.extractOffset).toVector
- val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
+ val filteredEnvelopes =
loadedEnvelopes.filterNot(isFilteredEvent)
if (filteredEnvelopes.isEmpty) {
offsetStore.saveOffsets(offsets)
} else {
@@ -236,7 +238,7 @@ private[projection] object R2dbcProjectionImpl {
override def process(envelope: Envelope): Future[Done] = {
offsetStore.isAccepted(envelope).flatMap {
case true =>
- if (skipEnvelope(envelope)) {
+ if (isFilteredEvent(envelope)) {
offsetStore.addInflight(envelope)
FutureDone
} else {
@@ -269,7 +271,7 @@ private[projection] object R2dbcProjectionImpl {
override def process(envelope: Envelope): Future[Done] = {
offsetStore.isAccepted(envelope).flatMap {
case true =>
- if (skipEnvelope(envelope)) {
+ if (isFilteredEvent(envelope)) {
offsetStore.addInflight(envelope)
FutureDone
} else {
@@ -304,7 +306,7 @@ private[projection] object R2dbcProjectionImpl {
} else {
Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env,
sourceProvider))).flatMap {
loadedEnvelopes =>
- val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
+ val filteredEnvelopes =
loadedEnvelopes.filterNot(isFilteredEvent)
if (filteredEnvelopes.isEmpty) {
offsetStore.addInflights(loadedEnvelopes)
FutureDone
@@ -337,7 +339,7 @@ private[projection] object R2dbcProjectionImpl {
.isAccepted(env)
.flatMap { ok =>
if (ok) {
- if (skipEnvelope(env)) {
+ if (isFilteredEvent(env)) {
log.info("atLeastOnceFlow doesn't support of skipping
envelopes. Envelope [{}] still emitted.", env)
}
loadEnvelope(env, sourceProvider).map { loadedEnvelope =>
@@ -378,17 +380,6 @@ private[projection] object R2dbcProjectionImpl {
delegate.stop()
}
- private def skipEnvelope[Envelope](env: Envelope): Boolean = {
- env match {
- case e: EventEnvelope[_] =>
- e.eventMetadata match {
- case Some(NotUsed) => true
- case _ => false
- }
- case _ => false
- }
- }
-
}
/**
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 00048b7..3940882 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -38,6 +38,7 @@ import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
import pekko.persistence.typed.PersistenceId
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.projection.BySlicesSourceProvider
import pekko.projection.HandlerRecoveryStrategy
import pekko.projection.ProjectionBehavior
@@ -279,7 +280,9 @@ class R2dbcTimestampOffsetProjectionSpec
env.timestamp,
env.eventMetadata,
env.entityType,
- env.slice)
+ env.slice,
+ env.filtered,
+ source = EnvelopeOrigin.SourceBacktracking)
def createEnvelopes(pid: Pid, numberOfEvents: Int):
immutable.IndexedSeq[EventEnvelope[String]] = {
(1 to numberOfEvents).map { n =>
@@ -360,16 +363,18 @@ class R2dbcTimestampOffsetProjectionSpec
}
}
- def markAsNotUsed[A](env: EventEnvelope[A]): EventEnvelope[A] = {
- new EventEnvelope(
+ def markAsFilteredEvent[A](env: EventEnvelope[A]): EventEnvelope[A] = {
+ new EventEnvelope[A](
env.offset,
env.persistenceId,
env.sequenceNr,
env.eventOption,
env.timestamp,
- eventMetadata = Some(NotUsed),
+ env.eventMetadata,
env.entityType,
- env.slice)
+ env.slice,
+ filtered = true,
+ env.source)
}
"A R2DBC exactly-once projection with TimestampOffset" must {
@@ -590,7 +595,7 @@ class R2dbcTimestampOffsetProjectionSpec
val envelopes = createEnvelopes(pid, 6).map { env =>
if (env.event == "e3" || env.event == "e4" || env.event == "e6")
- markAsNotUsed(env)
+ markAsFilteredEvent(env)
else
env
}
@@ -715,7 +720,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6).map { env =>
if (env.event == "e3" || env.event == "e4" || env.event == "e6")
- markAsNotUsed(env)
+ markAsFilteredEvent(env)
else
env
}
@@ -874,7 +879,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6).map { env =>
if (env.event == "e3" || env.event == "e4" || env.event == "e6")
- markAsNotUsed(env)
+ markAsFilteredEvent(env)
else
env
}
@@ -1025,7 +1030,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6).map { env =>
if (env.event == "e3" || env.event == "e4" || env.event == "e6")
- markAsNotUsed(env)
+ markAsFilteredEvent(env)
else
env
}
@@ -1214,7 +1219,7 @@ class R2dbcTimestampOffsetProjectionSpec
val envelopes = createEnvelopes(pid, 6).map { env =>
if (env.event == "e3" || env.event == "e4" || env.event == "e6")
- markAsNotUsed(env)
+ markAsFilteredEvent(env)
else
env
}
@@ -1350,7 +1355,7 @@ class R2dbcTimestampOffsetProjectionSpec
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid, 6).map { env =>
if (env.event == "e3" || env.event == "e4" || env.event == "e6")
- markAsNotUsed(env)
+ markAsFilteredEvent(env)
else
env
}
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index 0d76ebb..96982c9 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -30,6 +30,7 @@ import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
import pekko.persistence.typed.PersistenceId
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
import pekko.projection.BySlicesSourceProvider
import pekko.projection.ProjectionId
import pekko.projection.internal.ManagementState
@@ -110,7 +111,22 @@ class R2dbcTimestampOffsetStoreSpec
env.timestamp,
env.eventMetadata,
env.entityType,
- env.slice)
+ env.slice,
+ env.filtered,
+ source = EnvelopeOrigin.SourceBacktracking)
+
+ def filteredEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
+ new EventEnvelope[String](
+ env.offset,
+ env.persistenceId,
+ env.sequenceNr,
+ env.eventOption,
+ env.timestamp,
+ env.eventMetadata,
+ env.entityType,
+ env.slice,
+ filtered = true,
+ env.source)
def createUpdatedDurableState(
pid: Pid,
@@ -379,7 +395,22 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe
true
offsetStore.addInflight(env2)
// but not when gap
- offsetStore.isAccepted(createEnvelope("p4", 4L, startTime.plusMillis(3),
"e4-4")).futureValue shouldBe false
+ val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3),
"e4-4")
+ offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe false
+ // hard reject when gap from backtracking
+ (offsetStore
+ .isAccepted(backtrackingEnvelope(envP4SeqNr4))
+ .failed
+ .futureValue
+ .getMessage should fullyMatch).regex("Rejected envelope from
backtracking.*unexpected sequence number.*")
+ // reject filtered event when gap
+ offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue
shouldBe false
+ // hard reject when filtered event with gap from backtracking
+ (offsetStore
+ .isAccepted(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4)))
+ .failed
+ .futureValue
+ .getMessage should fullyMatch).regex("Rejected envelope from
backtracking.*unexpected sequence number.*")
// and not if later already inflight, seqNr 2 was accepted
offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1),
"e4-1")).futureValue shouldBe false
@@ -414,6 +445,8 @@ class R2dbcTimestampOffsetStoreSpec
// reject unknown
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
offsetStore.isAccepted(env7).futureValue shouldBe false
+
(offsetStore.isAccepted(backtrackingEnvelope(env7)).failed.futureValue.getMessage
should fullyMatch)
+ .regex("Rejected envelope from backtracking.*unknown sequence
number.*")
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
@@ -425,6 +458,22 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.isAccepted(env9).futureValue shouldBe true
offsetStore.addInflight(env9)
+ // reject unknown filtered
+ val env10 = filteredEnvelope(createEnvelope("p6", 7L,
startTime.plusMillis(10), "e6-7"))
+ offsetStore.isAccepted(env10).futureValue shouldBe false
+ // hard reject when unknown from backtracking
+ (offsetStore
+ .isAccepted(backtrackingEnvelope(env10))
+ .failed
+ .futureValue
+ .getMessage should fullyMatch).regex("Rejected envelope from
backtracking.*unknown sequence number.*")
+ // hard reject when unknown filtered event from backtracking
+ (offsetStore
+ .isAccepted(backtrackingEnvelope(filteredEnvelope(env10)))
+ .failed
+ .futureValue
+ .getMessage should fullyMatch).regex("Rejected envelope from
backtracking.*unknown sequence number.*")
+
// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" ->
2L, "p5" -> 8)
// and they are removed from inflight once they have been stored
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]