This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 5f7c791e12 Fix AsyncWriteJournalResponseOrderSpec with global
dispatcher (#2477) (#2485)
5f7c791e12 is described below
commit 5f7c791e125ff4e2e5e7fe748a48507156e9298b
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Nov 12 23:16:13 2025 +0100
Fix AsyncWriteJournalResponseOrderSpec with global dispatcher (#2477)
(#2485)
* Fix AsyncWriteJournalResponseOrderSpec with global dispatcher
This fixes a random test failure when the global dispatcher used
as a persistence plugin dispatcher.
This change should unblock https://github.com/apache/pekko/pull/2434
Also adjusted wording changes for write-response-global-order from
https://github.com/apache/pekko/pull/2437
The test relied on an ordering guarantee which wasn't really there.
Even though asyncWriteMessages promises were completed in some particular
order, AsyncWriteJournal sent out messages in an onComplete callback scheduled
on the actor dispatcher, yielding random message order.
The test failure shouldn't be a problem for production, non-test, Pekko
code, because nothing there relies
on write response ordering guarantees between independent persistent
actors. And for an individual persistent actor writes
- the persistent actor implementation internals handle ordering there
correctly regardless of the plugin response ordering.
* FIXUP
Co-authored-by: Mikhail Sokolov <[email protected]>
---
persistence/src/main/resources/reference.conf | 2 -
.../AsyncWriteJournalResponseOrderSpec.scala | 102 ++++++++++++---------
2 files changed, 60 insertions(+), 44 deletions(-)
diff --git a/persistence/src/main/resources/reference.conf
b/persistence/src/main/resources/reference.conf
index 8c7bbcd559..191c83dc89 100644
--- a/persistence/src/main/resources/reference.conf
+++ b/persistence/src/main/resources/reference.conf
@@ -164,8 +164,6 @@ pekko.persistence {
#
# When this setting is "off", the journal plugin is allowed to send back
write responses
# in any order. This can improve throughput and reduce latency under
load.
- # When using "off", changes to the dispatcher configuration of the
journal plugin
- # can lead to an increased number of out-of-order responses.
#
# The old behavior is still enabled by default ("on").
write-response-global-order = on
diff --git
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
index a0faa633c0..3f3537c0ee 100644
---
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
+++
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
@@ -17,12 +17,13 @@
package org.apache.pekko.persistence.journal
-import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol,
PersistenceSpec, PersistentRepr }
-import org.apache.pekko.testkit.ImplicitSender
+import
org.apache.pekko.persistence.journal.AsyncWriteJournalResponseOrderSpec._
-import scala.collection.immutable
+import scala.collection.{ immutable, mutable }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.Try
+import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol,
PersistenceSpec, PersistentRepr }
+import org.apache.pekko.testkit.ImplicitSender
/**
* Verifies write response ordering logic for [[AsyncWriteJournal]].
@@ -34,90 +35,103 @@ class AsyncWriteJournalResponseOrderSpec
PersistenceSpec.config(
plugin = "", // we will provide explicit plugin IDs later
test = classOf[AsyncWriteJournalResponseOrderSpec].getSimpleName,
+ // using the default system dispatcher to make sure
write-response-global-order works with it
+ // see: https://github.com/apache/pekko/pull/2434
extraConfig = Some(
s"""
- |pekko.persistence.journal.reverse-plugin {
+ |${ControlledWriteCompletionPlugin.BaseId} {
| with-global-order {
- | class =
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
- |
+ | class = "${classOf[ControlledWriteCompletionPlugin].getName}"
+ | plugin-dispatcher = "pekko.actor.default-dispatcher"
| write-response-global-order = on
| }
| no-global-order {
- | class =
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
- |
+ | class = "${classOf[ControlledWriteCompletionPlugin].getName}"
+ | plugin-dispatcher = "pekko.actor.default-dispatcher"
| write-response-global-order = off
| }
|}
|""".stripMargin
))) with ImplicitSender {
- import AsyncWriteJournalResponseOrderSpec._
-
"AsyncWriteJournal" must {
"return write responses in request order if global response order is
enabled" in {
val pluginRef =
- extension.journalFor(journalPluginId =
"pekko.persistence.journal.reverse-plugin.with-global-order")
-
- pluginRef ! mkWriteMessages(1)
- pluginRef ! mkWriteMessages(2)
- pluginRef ! mkWriteMessages(3)
-
- pluginRef ! CompleteWriteOps
-
- getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(1, 2, 3)
+ extension.journalFor(journalPluginId =
s"${ControlledWriteCompletionPlugin.BaseId}.with-global-order")
+
+ // request writes for persistence Ids 1..9
+ 1.to(9).foreach { persistenceId =>
+ pluginRef ! mkWriteMessages(persistenceId = persistenceId)
+ }
+ // complete writes for all but the first persistence ID in reverse order
+ pluginRef ! CompleteWriteOps(persistenceIdsInOrder =
2.to(9).toVector.reverse)
+ // AsyncWriteJournal should hold the responses yet to preserve the
response order
+ expectNoMessage()
+ // complete write for the first persistence ID
+ pluginRef ! CompleteWriteOps(persistenceIdsInOrder = Vector(1))
+ // now we should receive all responses in the order in which they have
been requested
+ getPersistenceIdsFromResponses(receiveN(18)) shouldEqual 1.to(9).toVector
}
- "return write responses in completion order if global response order is
disabled" in {
+ "return write responses as soon as the operation is complete if global
response order is disabled" in {
val pluginRef =
- extension.journalFor(journalPluginId =
"pekko.persistence.journal.reverse-plugin.no-global-order")
-
- pluginRef ! mkWriteMessages(1)
- pluginRef ! mkWriteMessages(2)
- pluginRef ! mkWriteMessages(3)
-
- pluginRef ! CompleteWriteOps
-
- getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(3, 2, 1)
+ extension.journalFor(journalPluginId =
s"${ControlledWriteCompletionPlugin.BaseId}.no-global-order")
+
+ // request writes for persistence Ids 1..9
+ 1.to(9).foreach { persistenceId =>
+ pluginRef ! mkWriteMessages(persistenceId = persistenceId)
+ }
+ // complete writes for all but the first persistence ID in reverse order
+ pluginRef ! CompleteWriteOps(persistenceIdsInOrder =
2.to(9).toVector.reverse)
+ // AsyncWriteJournal sends out write responses for 2..9 right away
without waiting
+ getPersistenceIdsFromResponses(receiveN(16)).toSet shouldEqual
2.to(9).toSet
+ // complete write for the first persistence ID
+ pluginRef ! CompleteWriteOps(persistenceIdsInOrder = Vector(1))
+ // and now we finally receive the response for persistence ID 1
+ getPersistenceIdsFromResponses(receiveN(2)) shouldEqual Vector(1)
}
}
- private def mkWriteMessages(num: Int): JournalProtocol.WriteMessages =
JournalProtocol.WriteMessages(
+ private def mkWriteMessages(persistenceId: Int):
JournalProtocol.WriteMessages = JournalProtocol.WriteMessages(
messages = Vector(AtomicWrite(PersistentRepr(
- payload = num,
+ payload = "",
sequenceNr = 0L,
- persistenceId = num.toString
+ persistenceId = persistenceId.toString
))),
persistentActor = self,
actorInstanceId = 1
)
- private def getMessageNumsFromResponses(responses: Seq[AnyRef]): Vector[Int]
= responses.collect {
+ private def getPersistenceIdsFromResponses(responses: Seq[AnyRef]):
Vector[Int] = responses.collect {
case successResponse: JournalProtocol.WriteMessageSuccess =>
- successResponse.persistent.payload.asInstanceOf[Int]
+ successResponse.persistent.persistenceId.toInt
}.toVector
}
private object AsyncWriteJournalResponseOrderSpec {
- case object CompleteWriteOps
+ final case class CompleteWriteOps(persistenceIdsInOrder: Vector[Int])
/**
- * Accumulates asyncWriteMessages requests and completes them in reverse
receive order on [[CompleteWriteOps]] command
+ * Accumulates asyncWriteMessages requests (one for each persistence ID is
expected)
+ * and completes them in requested order on [[CompleteWriteOps]] command.
*/
- class ReversePlugin extends AsyncWriteJournal {
+ final class ControlledWriteCompletionPlugin extends AsyncWriteJournal {
private implicit val ec: ExecutionContext = context.dispatcher
- private var pendingOps: Vector[Promise[Unit]] = Vector.empty
+ private val pendingOps: mutable.HashMap[Int, Promise[Unit]] =
mutable.HashMap.empty
override def receivePluginInternal: Receive = {
- case CompleteWriteOps =>
- pendingOps.reverse.foreach(_.success(()))
- pendingOps = Vector.empty
+ case cmd: CompleteWriteOps =>
+ cmd.persistenceIdsInOrder.foreach { persistenceId =>
+ pendingOps(persistenceId).success(())
+ pendingOps -= persistenceId
+ }
}
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]):
Future[immutable.Seq[Try[Unit]]] = {
val responsePromise = Promise[Unit]()
- pendingOps = pendingOps :+ responsePromise
+ pendingOps.put(messages.head.persistenceId.toInt, responsePromise)
responsePromise.future.map(_ => Vector.empty)
}
@@ -128,4 +142,8 @@ private object AsyncWriteJournalResponseOrderSpec {
override def asyncReadHighestSequenceNr(persistenceId: String,
fromSequenceNr: Long): Future[Long] = ???
}
+
+ object ControlledWriteCompletionPlugin {
+ val BaseId: String =
"pekko.persistence.journal.controlled-write-completion-plugin"
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]