This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a91c5e  Port remaining changes for #357 and 
akka/akka-persistence-r2dbc#348 (#375)
6a91c5e is described below

commit 6a91c5ec7b5624475c9080e297140b16f9d07fcb
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 10:15:52 2026 +0100

    Port remaining changes for #357 and akka/akka-persistence-r2dbc#348 (#375)
    
    * Port changes from akka-persistence-r2dbc PR #348 (filtered event fix)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/5e0c4694-7a40-473f-af9b-38a34621e7fc
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * move file
    
    * Update envelope-origin.excludes
    
    * Update EventsBySlicePubSubSpec.scala
    
    * Remove skipPubSubTooFarAhead - was from newer Akka version not in ported 
PRs
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/470ac23a-d70a-468f-a7ff-eede5c2667fa
    
    Co-authored-by: pjfanning <[email protected]>
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../r2dbc/internal/EnvelopeOrigin.scala            |  42 ++++++++
 .../pekko/persistence/r2dbc/internal/PubSub.scala  |   4 +-
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |  13 ++-
 .../query/EventsBySliceBacktrackingSpec.scala      |   4 +
 .../r2dbc/query/EventsBySlicePerfSpec.scala        | 117 +++++++++++++++++++--
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      |  56 ++++++----
 .../envelope-origin.excludes                       |  25 +++++
 .../r2dbc/internal/R2dbcOffsetStore.scala          |  32 +++---
 .../r2dbc/internal/R2dbcProjectionImpl.scala       |  29 ++---
 .../r2dbc/R2dbcTimestampOffsetProjectionSpec.scala |  27 +++--
 .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala      |  53 +++++++++-
 11 files changed, 319 insertions(+), 83 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EnvelopeOrigin.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EnvelopeOrigin.scala
new file mode 100644
index 0000000..7a1af02
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EnvelopeOrigin.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import org.apache.pekko
+import pekko.annotation.InternalStableApi
+import pekko.persistence.query.typed.EventEnvelope
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi private[pekko] object EnvelopeOrigin {
+  val SourceQuery = ""
+  val SourceBacktracking = "BT"
+  val SourcePubSub = "PS"
+
+  def fromQuery(env: EventEnvelope[_]): Boolean =
+    env.source == SourceQuery
+
+  def fromBacktracking(env: EventEnvelope[_]): Boolean =
+    env.source == SourceBacktracking
+
+  def fromPubSub(env: EventEnvelope[_]): Boolean =
+    env.source == SourcePubSub
+
+  def isFilteredEvent(env: Any): Boolean =
+    env match {
+      case e: EventEnvelope[_] => e.filtered
+      case _                   => false
+    }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
index f8d9562..6d901d2 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/PubSub.scala
@@ -125,7 +125,9 @@ import org.slf4j.LoggerFactory
         timestamp.toEpochMilli,
         pr.metadata,
         entityType,
-        slice)
+        slice,
+        filtered = false,
+        source = EnvelopeOrigin.SourcePubSub)
       eventTopic(entityType, slice) ! Topic.Publish(envelope)
     }
   }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 5be7856..3ead172 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -40,6 +40,7 @@ import pekko.persistence.query.{ EventEnvelope => 
ClassicEventEnvelope }
 import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.ContinuousQuery
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.persistence.r2dbc.internal.PubSub
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
 import pekko.persistence.typed.PersistenceId
@@ -84,6 +85,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
     val createEnvelope: (TimestampOffset, SerializedJournalRow) => 
EventEnvelope[Any] = (offset, row) => {
       val event = row.payload.map(payload => 
serialization.deserialize(payload, row.serId, row.serManifest).get)
       val metadata = row.metadata.map(meta => 
serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
+      val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else 
EnvelopeOrigin.SourceBacktracking
       new EventEnvelope(
         offset,
         row.persistenceId,
@@ -92,7 +94,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
         row.dbTimestamp.toEpochMilli,
         metadata,
         row.entityType,
-        row.slice)
+        row.slice,
+        filtered = false,
+        source)
     }
 
     val extractOffset: EventEnvelope[Any] => TimestampOffset = env => 
env.offset.asInstanceOf[TimestampOffset]
@@ -196,7 +200,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
           // cache of seen pid/seqNr
           var seen = mutable.LinkedHashSet.empty[(String, Long)]
           env => {
-            if (env.eventOption.isEmpty) {
+            if (EnvelopeOrigin.fromBacktracking(env)) {
               // don't deduplicate from backtracking
               env :: Nil
             } else {
@@ -322,6 +326,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
       row.payload.map(payload => serialization.deserialize(payload, row.serId, 
row.serManifest).get.asInstanceOf[Event])
     val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, 
Map(row.persistenceId -> row.seqNr))
     val metadata = row.metadata.map(meta => 
serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
+    val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else 
EnvelopeOrigin.SourceBacktracking
     new EventEnvelope(
       offset,
       row.persistenceId,
@@ -330,7 +335,9 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
       row.dbTimestamp.toEpochMilli,
       metadata,
       row.entityType,
-      row.slice)
+      row.slice,
+      filtered = false,
+      source)
   }
 
   private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope 
= {
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index b0dd655..061b6f9 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -29,6 +29,7 @@ import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.internal.InstantFactory
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
@@ -114,17 +115,20 @@ class EventsBySliceBacktrackingSpec
       env1.persistenceId shouldBe pid1
       env1.sequenceNr shouldBe 1L
       env1.eventOption shouldBe Some("e1-1")
+      env1.source shouldBe EnvelopeOrigin.SourceQuery
 
       val env2 = result.expectNext()
       env2.persistenceId shouldBe pid1
       env2.sequenceNr shouldBe 2L
       env2.eventOption shouldBe Some("e1-2")
+      env2.source shouldBe EnvelopeOrigin.SourceQuery
 
       // first backtracking query kicks in immediately after the first normal 
query has finished
       // and it also emits duplicates (by design)
       val env3 = result.expectNext()
       env3.persistenceId shouldBe pid1
       env3.sequenceNr shouldBe 1L
+      env3.source shouldBe EnvelopeOrigin.SourceBacktracking
       // event payload isn't included in backtracking results
       env3.eventOption shouldBe None
       // but it can be lazy loaded
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
index 5b12b34..51599dc 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePerfSpec.scala
@@ -27,24 +27,48 @@ import pekko.persistence.query.NoOffset
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.r2dbc.TestActors
 import pekko.persistence.r2dbc.TestActors.Persister.Persist
+import pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
+import pekko.persistence.r2dbc.QuerySettings
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.stream.scaladsl.Sink
+import pekko.stream.scaladsl.Source
+import com.typesafe.config.ConfigFactory
 import org.scalatest.wordspec.AnyWordSpecLike
 
+object EventsBySlicePerfSpec {
+  private val config = ConfigFactory
+    .parseString("""
+    pekko.persistence.r2dbc.journal.publish-events = on
+    pekko.persistence.r2dbc.query {
+      backtracking.enabled = on
+      refresh-interval = 3s
+      #buffer-size = 100
+    }
+    """)
+    .withFallback(TestConfig.config)
+
+  private final case class PidSeqNr(pid: String, seqNr: Long)
+}
+
 class EventsBySlicePerfSpec
-    extends 
ScalaTestWithActorTestKit(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.config))
+    extends ScalaTestWithActorTestKit(EventsBySlicePerfSpec.config)
     with AnyWordSpecLike
     with TestDbLifecycle
-    with TestData
-    with LogCapturing {
+    with LogCapturing
+    with TestData {
+  import EventsBySlicePerfSpec.PidSeqNr
 
   override def typedSystem: ActorSystem[_] = system
 
   private val query = 
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
 
+  private lazy val r2dbcQuerySettings =
+    
QuerySettings(testKit.system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
+
   "EventsBySlices performance" should {
 
     "retrieve from several slices" in {
@@ -94,22 +118,97 @@ class EventsBySlicePerfSpec
         val counts: Seq[Future[Int]] = ranges.map { range =>
           query
             .currentEventsBySlices[String](entityType, range.min, range.max, 
NoOffset)
-            .runWith(Sink.fold(0) { case (acc, _) =>
-              if (acc > 0 && acc % 100 == 0)
-                println(s"#$iteration Reading [$acc] events from slices 
[${range.min}-${range.max}] " +
-                  s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
-              acc + 1
+            .runWith(Sink.fold(0) { case (acc, env) =>
+              if (EnvelopeOrigin.fromQuery(env)) {
+                if (acc > 0 && acc % 100 == 0)
+                  println(s"#$iteration Reading [$acc] events from slices 
[${range.min}-${range.max}] " +
+                    s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
+                acc + 1
+              } else {
+                acc
+              }
             })
         }
         implicit val ec: ExecutionContext = testKit.system.executionContext
         val total = Await.result(Future.sequence(counts).map(_.sum), 
30.seconds)
         total shouldBe totalNumberOfEvents
         println(
-          s"#$iteration Reading all [$totalNumberOfEvents] events from 
[${ranges.size}] eventsBySlices " +
+          s"#$iteration Reading all [$totalNumberOfEvents] events from 
[${ranges.size}] slices with currentEventsBySlices " +
           s"took [${(System.nanoTime() - t1) / 1000 / 1000}] ms")
       }
     }
 
+    "write and read concurrently" in {
+      // increase these properties for "real" testing
+      // also, remove LogCapturing and change logback log levels for "real" 
testing
+      val numberOfEventsPerWriter = 20
+      val writeConcurrency = 10
+      val writeRps = 300
+      val iterations = 2
+      val totalNumberOfEvents = writeConcurrency * numberOfEventsPerWriter
+      val verbosePrintLag = false
+
+      implicit val ec: ExecutionContext = testKit.system.executionContext
+
+      val entityType = nextEntityType()
+      val persistenceIds = (1 to writeConcurrency).map(_ => 
nextPid(entityType)).toVector
+
+      (1 to iterations).foreach { iteration =>
+        val t0 = System.nanoTime()
+        val writeProbe = createTestProbe[Done]()
+        val persisters = persistenceIds.map(pid => 
testKit.spawn(TestActors.Persister(pid)))
+        Source(1 to numberOfEventsPerWriter)
+          .mapConcat(n => persisters.map(ref => ref -> n))
+          .throttle(writeRps / 10, 100.millis)
+          .map { case (ref, n) =>
+            ref ! Persist(s"e-$n")
+          }
+          .runWith(Sink.ignore)
+          .foreach { _ =>
+            // stop them at the end
+            persisters.foreach(_ ! TestActors.Persister.Stop(writeProbe.ref))
+          }
+
+        val done: Future[Done] =
+          query
+            .eventsBySlices[String](entityType, 0, 
persistenceExt.numberOfSlices - 1, NoOffset)
+            .scan(Set.empty[PidSeqNr]) { case (acc, env) =>
+              val newAcc = acc + PidSeqNr(env.persistenceId, env.sequenceNr)
+
+              if (verbosePrintLag) {
+                val duplicate = if (newAcc.size == acc.size) " (duplicate)" 
else ""
+                val lagMillis = System.currentTimeMillis() - env.timestamp
+                val delayed =
+                  (EnvelopeOrigin.fromPubSub(env) && lagMillis > 50) ||
+                  (EnvelopeOrigin.fromQuery(env) && lagMillis > 
r2dbcQuerySettings.refreshInterval.toMillis + 300) ||
+                  (EnvelopeOrigin.fromPubSub(
+                    env) && lagMillis > 
r2dbcQuerySettings.backtrackingWindow.toMillis / 2 + 300)
+                if (delayed)
+                  println(
+                    s"# received ${newAcc.size}$duplicate from ${env.source}: 
${env.persistenceId} seqNr ${env.sequenceNr}, lag $lagMillis ms")
+              }
+
+              if (newAcc.size != acc.size && (newAcc.size % 100 == 0))
+                println(s"#$iteration Reading [${newAcc.size}] events " +
+                  s"took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
+              newAcc
+
+            }
+            .takeWhile(_.size < totalNumberOfEvents)
+            .runWith(Sink.ignore)
+
+        writeProbe.receiveMessages(persisters.size, (totalNumberOfEvents / 
writeRps).seconds + 10.seconds)
+        println(
+          s"#$iteration Persisting all [$totalNumberOfEvents] events from 
[${persistenceIds.size}] persistent " +
+          s"actors took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
+
+        Await.result(done, 30.seconds)
+        println(
+          s"#$iteration Reading all [$totalNumberOfEvents] events with 
eventsBySlices " +
+          s"took [${(System.nanoTime() - t0) / 1000 / 1000}] ms")
+      }
+    }
+
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index edbb96f..660bdda 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -36,10 +36,11 @@ import 
pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
-import pekko.persistence.r2dbc.internal.InstantFactory
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.persistence.r2dbc.internal.PubSub
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
+import pekko.stream.scaladsl.Keep
 import pekko.stream.scaladsl.Sink
 import pekko.stream.scaladsl.Source
 import pekko.stream.testkit.TestSubscriber
@@ -90,26 +91,45 @@ class EventsBySlicePubSubSpec
     val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)
   }
 
-  private def createEnvelope(pid: PersistenceId, seqNr: Long, evt: String): 
EventEnvelope[String] = {
-    val now = InstantFactory.now()
+  private def createEnvelope(
+      pid: PersistenceId,
+      seqNr: Long,
+      evt: String,
+      time: Instant = Instant.now()): EventEnvelope[String] = {
     EventEnvelope(
-      TimestampOffset(now, Map(pid.id -> seqNr)),
+      TimestampOffset(time, time, Map(pid.id -> seqNr)),
       pid.id,
       seqNr,
       evt,
-      now.toEpochMilli,
+      time.toEpochMilli,
       pid.entityTypeHint,
-      query.sliceForPersistenceId(pid.id))
+      query.sliceForPersistenceId(pid.id),
+      filtered = false,
+      source = EnvelopeOrigin.SourcePubSub)
   }
 
+  def backtrackingEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
+    new EventEnvelope[String](
+      env.offset,
+      env.persistenceId,
+      env.sequenceNr,
+      eventOption = None,
+      env.timestamp,
+      env.eventMetadata,
+      env.entityType,
+      env.slice,
+      filtered = false,
+      source = EnvelopeOrigin.SourceBacktracking)
+
   private val entityType = nextEntityType()
   private val pidA = PersistenceId(entityType, "A")
   private val pidB = PersistenceId(entityType, "B")
-  private val envA1 = createEnvelope(pidA, 1L, "a1")
-  private val envA2 = createEnvelope(pidA, 2L, "a2")
-  private val envA3 = createEnvelope(pidA, 3L, "a3")
-  private val envB1 = createEnvelope(pidB, 1L, "b1")
-  private val envB2 = createEnvelope(pidB, 2L, "b2")
+  val now = Instant.now()
+  private val envA1 = createEnvelope(pidA, 1L, "a1", now)
+  private val envA2 = createEnvelope(pidA, 2L, "a2", now.plusMillis(1))
+  private val envA3 = createEnvelope(pidA, 3L, "a3", now.plusMillis(2))
+  private val envB1 = createEnvelope(pidB, 1L, "b1", now.plusMillis(3))
+  private val envB2 = createEnvelope(pidB, 2L, "b2", now.plusMillis(4))
 
   "EventsBySlices pub-sub" should {
 
@@ -133,7 +153,9 @@ class EventsBySlicePubSubSpec
 
       // 10 was requested
       for (i <- 1 to 10) {
-        result.expectNext().event shouldBe s"e-$i"
+        val env = result.expectNext()
+        env.event shouldBe s"e-$i"
+        env.source shouldBe EnvelopeOrigin.SourcePubSub
       }
       result.expectNoMessage()
 
@@ -166,15 +188,7 @@ class EventsBySlicePubSubSpec
     }
 
     "not deduplicate from backtracking" in {
-      val envA2back = new EventEnvelope[String](
-        envA2.offset,
-        envA2.persistenceId,
-        envA2.sequenceNr,
-        eventOption = None,
-        envA2.timestamp,
-        envA2.eventMetadata,
-        envA2.entityType,
-        envA2.slice)
+      val envA2back = backtrackingEnvelope(envA2)
       val out = Source(List(envA1, envA2, envB1, envA2back, envB2))
         .via(query.deduplicate(capacity = 10))
         .runWith(Sink.seq)
diff --git 
a/projection/src/main/mima-filters/2.0.x.backwards.excludes/envelope-origin.excludes
 
b/projection/src/main/mima-filters/2.0.x.backwards.excludes/envelope-origin.excludes
new file mode 100644
index 0000000..b8338e1
--- /dev/null
+++ 
b/projection/src/main/mima-filters/2.0.x.backwards.excludes/envelope-origin.excludes
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by porting https://github.com/akka/akka-persistence-r2dbc/pull/348
+ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.copy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.unapply")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.copy*")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset.envelopeLoaded")
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
index c85c50f..01d26c8 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala
@@ -38,6 +38,7 @@ import pekko.persistence.query.UpdatedDurableState
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
 import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.typed.PersistenceId
@@ -63,7 +64,8 @@ object R2dbcOffsetStore {
       record: Record,
       offset: TimestampOffset,
       strictSeqNr: Boolean,
-      envelopeLoaded: Boolean)
+      fromBacktracking: Boolean,
+      fromPubSub: Boolean)
 
   object State {
     val empty: State = State(Map.empty, Vector.empty, Instant.EPOCH)
@@ -658,14 +660,14 @@ private[projection] class R2dbcOffsetStore(
       val prevSeqNr = currentInflight.getOrElse(pid, 
currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
 
       def logUnexpected(): Unit = {
-        if (viaPubSub(recordWithOffset.offset))
+        if (recordWithOffset.fromPubSub)
           logger.debug(
             "Rejecting pub-sub envelope, unexpected sequence number [{}] for 
pid [{}], previous sequence number [{}]. Offset: {}",
             seqNr: java.lang.Long,
             pid,
             prevSeqNr: java.lang.Long,
             recordWithOffset.offset)
-        else if (recordWithOffset.envelopeLoaded)
+        else if (!recordWithOffset.fromBacktracking)
           logger.debug(
             "Rejecting unexpected sequence number [{}] for pid [{}], previous 
sequence number [{}]. Offset: {}",
             seqNr: java.lang.Long,
@@ -682,13 +684,13 @@ private[projection] class R2dbcOffsetStore(
       }
 
       def logUnknown(): Unit = {
-        if (viaPubSub(recordWithOffset.offset)) {
+        if (recordWithOffset.fromPubSub) {
           logger.debug(
             "Rejecting pub-sub envelope, unknown sequence number [{}] for pid 
[{}] (might be accepted later): {}",
             seqNr: java.lang.Long,
             pid,
             recordWithOffset.offset)
-        } else if (recordWithOffset.envelopeLoaded) {
+        } else if (!recordWithOffset.fromBacktracking) {
           // This may happen rather frequently when using `publish-events`, 
after reconnecting and such.
           logger.debug(
             "Rejecting unknown sequence number [{}] for pid [{}] (might be 
accepted later): {}",
@@ -713,7 +715,7 @@ private[projection] class R2dbcOffsetStore(
           // currentInFlight contains those that have been processed or about 
to be processed in Flow,
           // but offset not saved yet => ok to handle as duplicate
           FutureFalse
-        } else if (recordWithOffset.envelopeLoaded) {
+        } else if (!recordWithOffset.fromBacktracking) {
           logUnexpected()
           FutureFalse
         } else {
@@ -745,7 +747,7 @@ private[projection] class R2dbcOffsetStore(
                 previousTimestamp,
                 before)
               true
-            } else if (recordWithOffset.envelopeLoaded) {
+            } else if (!recordWithOffset.fromBacktracking) {
               logUnknown()
               false
             } else {
@@ -777,13 +779,6 @@ private[projection] class R2dbcOffsetStore(
     }
   }
 
-  private def viaPubSub(offset: Offset): Boolean = {
-    offset match {
-      case t: TimestampOffset => t.timestamp == t.readTimestamp
-      case _                  => false
-    }
-  }
-
   @tailrec final def addInflight[Envelope](envelope: Envelope): Unit = {
     createRecordWithOffset(envelope) match {
       case Some(recordWithOffset) =>
@@ -1013,7 +1008,8 @@ private[projection] class R2dbcOffsetStore(
             Record(eventEnvelope.persistenceId, eventEnvelope.sequenceNr, 
timestampOffset.timestamp),
             timestampOffset,
             strictSeqNr = true,
-            envelopeLoaded = eventEnvelope.eventOption.isDefined))
+            fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope),
+            fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope)))
       case change: UpdatedDurableState[_] if 
change.offset.isInstanceOf[TimestampOffset] =>
         val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
         Some(
@@ -1021,7 +1017,8 @@ private[projection] class R2dbcOffsetStore(
             Record(change.persistenceId, change.revision, 
timestampOffset.timestamp),
             timestampOffset,
             strictSeqNr = false,
-            envelopeLoaded = change.value != null))
+            fromBacktracking = change.value == null,
+            fromPubSub = false))
       case change: DeletedDurableState[_] if 
change.offset.isInstanceOf[TimestampOffset] =>
         val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
         Some(
@@ -1029,7 +1026,8 @@ private[projection] class R2dbcOffsetStore(
             Record(change.persistenceId, change.revision, 
timestampOffset.timestamp),
             timestampOffset,
             strictSeqNr = false,
-            envelopeLoaded = true))
+            fromBacktracking = false,
+            fromPubSub = false))
       case change: DurableStateChange[_] if 
change.offset.isInstanceOf[TimestampOffset] =>
         // in case additional types are added
         throw new IllegalArgumentException(
diff --git 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
index 0ee478d..ce2ff55 100644
--- 
a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
+++ 
b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala
@@ -23,7 +23,6 @@ import scala.util.control.NonFatal
 
 import org.apache.pekko
 import pekko.Done
-import pekko.NotUsed
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.event.Logging
@@ -32,6 +31,7 @@ import pekko.persistence.query.DeletedDurableState
 import pekko.persistence.query.UpdatedDurableState
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.state.scaladsl.DurableStateStore
 import pekko.persistence.state.scaladsl.GetObjectResult
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory
  */
 @InternalApi
 private[projection] object R2dbcProjectionImpl {
+  import EnvelopeOrigin.{ fromBacktracking, isFilteredEvent }
   val log: Logger = LoggerFactory.getLogger(classOf[R2dbcProjectionImpl[_, _]])
 
   private val FutureDone: Future[Done] = Future.successful(Done)
@@ -96,7 +97,8 @@ private[projection] object R2dbcProjectionImpl {
   def loadEnvelope[Envelope](env: Envelope, sourceProvider: SourceProvider[_, 
Envelope])(implicit
       ec: ExecutionContext): Future[Envelope] = {
     env match {
-      case eventEnvelope: EventEnvelope[_] if 
eventEnvelope.eventOption.isEmpty && !skipEnvelope(eventEnvelope) =>
+      case eventEnvelope: EventEnvelope[_]
+          if fromBacktracking(eventEnvelope) && 
eventEnvelope.eventOption.isEmpty && !eventEnvelope.filtered =>
         val pid = eventEnvelope.persistenceId
         val seqNr = eventEnvelope.sequenceNr
         (sourceProvider match {
@@ -166,7 +168,7 @@ private[projection] object R2dbcProjectionImpl {
         override def process(envelope: Envelope): Future[Done] = {
           offsetStore.isAccepted(envelope).flatMap {
             case true =>
-              if (skipEnvelope(envelope)) {
+              if (isFilteredEvent(envelope)) {
                 val offset = sourceProvider.extractOffset(envelope)
                 offsetStore.saveOffset(offset)
               } else {
@@ -207,7 +209,7 @@ private[projection] object R2dbcProjectionImpl {
             Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, 
sourceProvider))).flatMap {
               loadedEnvelopes =>
                 val offsets = 
loadedEnvelopes.iterator.map(sourceProvider.extractOffset).toVector
-                val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
+                val filteredEnvelopes = 
loadedEnvelopes.filterNot(isFilteredEvent)
                 if (filteredEnvelopes.isEmpty) {
                   offsetStore.saveOffsets(offsets)
                 } else {
@@ -236,7 +238,7 @@ private[projection] object R2dbcProjectionImpl {
         override def process(envelope: Envelope): Future[Done] = {
           offsetStore.isAccepted(envelope).flatMap {
             case true =>
-              if (skipEnvelope(envelope)) {
+              if (isFilteredEvent(envelope)) {
                 offsetStore.addInflight(envelope)
                 FutureDone
               } else {
@@ -269,7 +271,7 @@ private[projection] object R2dbcProjectionImpl {
         override def process(envelope: Envelope): Future[Done] = {
           offsetStore.isAccepted(envelope).flatMap {
             case true =>
-              if (skipEnvelope(envelope)) {
+              if (isFilteredEvent(envelope)) {
                 offsetStore.addInflight(envelope)
                 FutureDone
               } else {
@@ -304,7 +306,7 @@ private[projection] object R2dbcProjectionImpl {
           } else {
             Future.sequence(acceptedEnvelopes.map(env => loadEnvelope(env, 
sourceProvider))).flatMap {
               loadedEnvelopes =>
-                val filteredEnvelopes = loadedEnvelopes.filterNot(skipEnvelope)
+                val filteredEnvelopes = 
loadedEnvelopes.filterNot(isFilteredEvent)
                 if (filteredEnvelopes.isEmpty) {
                   offsetStore.addInflights(loadedEnvelopes)
                   FutureDone
@@ -337,7 +339,7 @@ private[projection] object R2dbcProjectionImpl {
           .isAccepted(env)
           .flatMap { ok =>
             if (ok) {
-              if (skipEnvelope(env)) {
+              if (isFilteredEvent(env)) {
                 log.info("atLeastOnceFlow doesn't support of skipping 
envelopes. Envelope [{}] still emitted.", env)
               }
               loadEnvelope(env, sourceProvider).map { loadedEnvelope =>
@@ -378,17 +380,6 @@ private[projection] object R2dbcProjectionImpl {
       delegate.stop()
   }
 
-  private def skipEnvelope[Envelope](env: Envelope): Boolean = {
-    env match {
-      case e: EventEnvelope[_] =>
-        e.eventMetadata match {
-          case Some(NotUsed) => true
-          case _             => false
-        }
-      case _ => false
-    }
-  }
-
 }
 
 /**
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
index 00048b7..3940882 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala
@@ -38,6 +38,7 @@ import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
 import pekko.persistence.typed.PersistenceId
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.projection.BySlicesSourceProvider
 import pekko.projection.HandlerRecoveryStrategy
 import pekko.projection.ProjectionBehavior
@@ -279,7 +280,9 @@ class R2dbcTimestampOffsetProjectionSpec
       env.timestamp,
       env.eventMetadata,
       env.entityType,
-      env.slice)
+      env.slice,
+      env.filtered,
+      source = EnvelopeOrigin.SourceBacktracking)
 
   def createEnvelopes(pid: Pid, numberOfEvents: Int): 
immutable.IndexedSeq[EventEnvelope[String]] = {
     (1 to numberOfEvents).map { n =>
@@ -360,16 +363,18 @@ class R2dbcTimestampOffsetProjectionSpec
     }
   }
 
-  def markAsNotUsed[A](env: EventEnvelope[A]): EventEnvelope[A] = {
-    new EventEnvelope(
+  def markAsFilteredEvent[A](env: EventEnvelope[A]): EventEnvelope[A] = {
+    new EventEnvelope[A](
       env.offset,
       env.persistenceId,
       env.sequenceNr,
       env.eventOption,
       env.timestamp,
-      eventMetadata = Some(NotUsed),
+      env.eventMetadata,
       env.entityType,
-      env.slice)
+      env.slice,
+      filtered = true,
+      env.source)
   }
 
   "A R2DBC exactly-once projection with TimestampOffset" must {
@@ -590,7 +595,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
       val envelopes = createEnvelopes(pid, 6).map { env =>
         if (env.event == "e3" || env.event == "e4" || env.event == "e6")
-          markAsNotUsed(env)
+          markAsFilteredEvent(env)
         else
           env
       }
@@ -715,7 +720,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6).map { env =>
         if (env.event == "e3" || env.event == "e4" || env.event == "e6")
-          markAsNotUsed(env)
+          markAsFilteredEvent(env)
         else
           env
       }
@@ -874,7 +879,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6).map { env =>
         if (env.event == "e3" || env.event == "e4" || env.event == "e6")
-          markAsNotUsed(env)
+          markAsFilteredEvent(env)
         else
           env
       }
@@ -1025,7 +1030,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6).map { env =>
         if (env.event == "e3" || env.event == "e4" || env.event == "e6")
-          markAsNotUsed(env)
+          markAsFilteredEvent(env)
         else
           env
       }
@@ -1214,7 +1219,7 @@ class R2dbcTimestampOffsetProjectionSpec
 
       val envelopes = createEnvelopes(pid, 6).map { env =>
         if (env.event == "e3" || env.event == "e4" || env.event == "e6")
-          markAsNotUsed(env)
+          markAsFilteredEvent(env)
         else
           env
       }
@@ -1350,7 +1355,7 @@ class R2dbcTimestampOffsetProjectionSpec
       val projectionId = genRandomProjectionId()
       val envelopes = createEnvelopes(pid, 6).map { env =>
         if (env.event == "e3" || env.event == "e4" || env.event == "e6")
-          markAsNotUsed(env)
+          markAsFilteredEvent(env)
         else
           env
       }
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
index 0d76ebb..96982c9 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala
@@ -30,6 +30,7 @@ import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
 import pekko.persistence.typed.PersistenceId
+import pekko.persistence.r2dbc.internal.EnvelopeOrigin
 import pekko.projection.BySlicesSourceProvider
 import pekko.projection.ProjectionId
 import pekko.projection.internal.ManagementState
@@ -110,7 +111,22 @@ class R2dbcTimestampOffsetStoreSpec
       env.timestamp,
       env.eventMetadata,
       env.entityType,
-      env.slice)
+      env.slice,
+      env.filtered,
+      source = EnvelopeOrigin.SourceBacktracking)
+
+  def filteredEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
+    new EventEnvelope[String](
+      env.offset,
+      env.persistenceId,
+      env.sequenceNr,
+      env.eventOption,
+      env.timestamp,
+      env.eventMetadata,
+      env.entityType,
+      env.slice,
+      filtered = true,
+      env.source)
 
   def createUpdatedDurableState(
       pid: Pid,
@@ -379,7 +395,22 @@ class R2dbcTimestampOffsetStoreSpec
       offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe 
true
       offsetStore.addInflight(env2)
       // but not when gap
-      offsetStore.isAccepted(createEnvelope("p4", 4L, startTime.plusMillis(3), 
"e4-4")).futureValue shouldBe false
+      val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), 
"e4-4")
+      offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe false
+      // hard reject when gap from backtracking
+      (offsetStore
+        .isAccepted(backtrackingEnvelope(envP4SeqNr4))
+        .failed
+        .futureValue
+        .getMessage should fullyMatch).regex("Rejected envelope from 
backtracking.*unexpected sequence number.*")
+      // reject filtered event when gap
+      offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue 
shouldBe false
+      // hard reject when filtered event with gap from backtracking
+      (offsetStore
+        .isAccepted(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4)))
+        .failed
+        .futureValue
+        .getMessage should fullyMatch).regex("Rejected envelope from 
backtracking.*unexpected sequence number.*")
       // and not if later already inflight, seqNr 2 was accepted
       offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), 
"e4-1")).futureValue shouldBe false
 
@@ -414,6 +445,8 @@ class R2dbcTimestampOffsetStoreSpec
       // reject unknown
       val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
       offsetStore.isAccepted(env7).futureValue shouldBe false
+      
(offsetStore.isAccepted(backtrackingEnvelope(env7)).failed.futureValue.getMessage
 should fullyMatch)
+        .regex("Rejected envelope from backtracking.*unknown sequence 
number.*")
       // but ok when previous is old
       eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
       val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
@@ -425,6 +458,22 @@ class R2dbcTimestampOffsetStoreSpec
       offsetStore.isAccepted(env9).futureValue shouldBe true
       offsetStore.addInflight(env9)
 
+      // reject unknown filtered
+      val env10 = filteredEnvelope(createEnvelope("p6", 7L, 
startTime.plusMillis(10), "e6-7"))
+      offsetStore.isAccepted(env10).futureValue shouldBe false
+      // hard reject when unknown from backtracking
+      (offsetStore
+        .isAccepted(backtrackingEnvelope(env10))
+        .failed
+        .futureValue
+        .getMessage should fullyMatch).regex("Rejected envelope from 
backtracking.*unknown sequence number.*")
+      // hard reject when unknown filtered event from backtracking
+      (offsetStore
+        .isAccepted(backtrackingEnvelope(filteredEnvelope(env10)))
+        .failed
+        .futureValue
+        .getMessage should fullyMatch).regex("Rejected envelope from 
backtracking.*unknown sequence number.*")
+
       // it's keeping the inflight that are not in the "stored" state
       offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" -> 
2L, "p5" -> 8)
       // and they are removed from inflight once they have been stored


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to