This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new aaf9e8d Match EventSourcedPubSubSpec to akka reference implementation
(#383)
aaf9e8d is described below
commit aaf9e8dfa65722c3ed1cb76280d630a480b45a60
Author: PJ Fanning <[email protected]>
AuthorDate: Thu May 21 10:01:00 2026 +0100
Match EventSourcedPubSubSpec to akka reference implementation (#383)
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-persistence-r2dbc/sessions/76672d6a-08ef-4a06-9482-14a2ee33dec9
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../projection/r2dbc/EventSourcedPubSubSpec.scala | 41 +++++++++++-----------
1 file changed, 20 insertions(+), 21 deletions(-)
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index b31ae09..17f6967 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -26,6 +26,7 @@ import
pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.testkit.typed.scaladsl.TestProbe
import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
@@ -43,23 +44,23 @@ import org.slf4j.LoggerFactory
object EventSourcedPubSubSpec {
- val config: Config = ConfigFactory.load(
- ConfigFactory
- .parseString("""
- pekko.persistence.r2dbc {
- journal.publish-events = on
+ val config: Config = ConfigFactory
+ .parseString("""
+ pekko.persistence.r2dbc {
+ journal.publish-events = on
+ query {
refresh-interval = 3 seconds
- # simulate lost messages by overflowing the buffer
- buffer-size = 10
+ # simulate lost messages by overflowing the buffer
+ buffer-size = 10
- backtracking {
- behind-current-time = 100 millis
- window = 20 seconds
- }
+ backtracking {
+ behind-current-time = 5 seconds
+ window = 20 seconds
+ }
}
- """)
- .withFallback(TestConfig.unresolvedConfig)
- )
+ }
+ """)
+ .withFallback(TestConfig.config)
final case class Processed(projectionId: ProjectionId, envelope:
EventEnvelope[String])
@@ -74,13 +75,13 @@ object EventSourcedPubSubSpec {
whenDone(envelope).map { _ =>
val timestampOffset = envelope.offset.asInstanceOf[TimestampOffset]
val directReplication = timestampOffset.timestamp ==
timestampOffset.readTimestamp
- log.debug(
+ log.debugN(
"{} Processed {}, pid {}, seqNr {}, direct {}",
projectionId.key,
envelope.event,
envelope.persistenceId,
- envelope.sequenceNr: java.lang.Long,
- directReplication: java.lang.Boolean)
+ envelope.sequenceNr,
+ directReplication)
probe ! Processed(projectionId, envelope)
Done
}
@@ -175,13 +176,11 @@ class EventSourcedPubSubSpec
spawn(Persister(persistenceId), s"p$n")
}
- // write some before starting the projections, with ack to ensure they
are all in the DB
- val ackProbe = createTestProbe[Done]()
+ // write some before starting the projections
(1 to 10).foreach { n =>
val p = n % numberOfEntities
- entities(p) ! Persister.PersistWithAck(mkEvent(n), ackProbe.ref)
+ entities(p) ! Persister.Persist(mkEvent(n))
}
- (1 to 10).foreach { _ => ackProbe.receiveMessage(10.seconds) }
val projectionName = UUID.randomUUID().toString
val processedProbe = createTestProbe[Processed]()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]