This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new aaf9e8d  Match EventSourcedPubSubSpec to akka reference implementation 
(#383)
aaf9e8d is described below

commit aaf9e8dfa65722c3ed1cb76280d630a480b45a60
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 21 10:01:00 2026 +0100

    Match EventSourcedPubSubSpec to akka reference implementation (#383)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/76672d6a-08ef-4a06-9482-14a2ee33dec9
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../projection/r2dbc/EventSourcedPubSubSpec.scala  | 41 +++++++++++-----------
 1 file changed, 20 insertions(+), 21 deletions(-)

diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index b31ae09..17f6967 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -26,6 +26,7 @@ import 
pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.testkit.typed.scaladsl.TestProbe
 import pekko.actor.typed.ActorRef
 import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
 import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
@@ -43,23 +44,23 @@ import org.slf4j.LoggerFactory
 
 object EventSourcedPubSubSpec {
 
-  val config: Config = ConfigFactory.load(
-    ConfigFactory
-      .parseString("""
-      pekko.persistence.r2dbc {
-        journal.publish-events = on
+  val config: Config = ConfigFactory
+    .parseString("""
+    pekko.persistence.r2dbc {
+      journal.publish-events = on
+      query {
         refresh-interval = 3 seconds
-          # simulate lost messages by overflowing the buffer
-          buffer-size = 10
+        # simulate lost messages by overflowing the buffer
+        buffer-size = 10
 
-          backtracking {
-            behind-current-time = 100 millis
-            window = 20 seconds
-          }
+        backtracking {
+          behind-current-time = 5 seconds
+          window = 20 seconds
+        }
       }
-      """)
-      .withFallback(TestConfig.unresolvedConfig)
-  )
+    }
+    """)
+    .withFallback(TestConfig.config)
 
   final case class Processed(projectionId: ProjectionId, envelope: 
EventEnvelope[String])
 
@@ -74,13 +75,13 @@ object EventSourcedPubSubSpec {
       whenDone(envelope).map { _ =>
         val timestampOffset = envelope.offset.asInstanceOf[TimestampOffset]
         val directReplication = timestampOffset.timestamp == 
timestampOffset.readTimestamp
-        log.debug(
+        log.debugN(
           "{} Processed {}, pid {}, seqNr {}, direct {}",
           projectionId.key,
           envelope.event,
           envelope.persistenceId,
-          envelope.sequenceNr: java.lang.Long,
-          directReplication: java.lang.Boolean)
+          envelope.sequenceNr,
+          directReplication)
         probe ! Processed(projectionId, envelope)
         Done
       }
@@ -175,13 +176,11 @@ class EventSourcedPubSubSpec
         spawn(Persister(persistenceId), s"p$n")
       }
 
-      // write some before starting the projections, with ack to ensure they 
are all in the DB
-      val ackProbe = createTestProbe[Done]()
+      // write some before starting the projections
       (1 to 10).foreach { n =>
         val p = n % numberOfEntities
-        entities(p) ! Persister.PersistWithAck(mkEvent(n), ackProbe.ref)
+        entities(p) ! Persister.Persist(mkEvent(n))
       }
-      (1 to 10).foreach { _ => ackProbe.receiveMessage(10.seconds) }
 
       val projectionName = UUID.randomUUID().toString
       val processedProbe = createTestProbe[Processed]()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to