This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d52aca20bc Fix AsyncWriteJournalResponseOrderSpec with global 
dispatcher (#2477)
d52aca20bc is described below

commit d52aca20bcd184eda52e3b2ed9dd906e9cc16126
Author: Mikhail Sokolov <[email protected]>
AuthorDate: Wed Nov 12 11:26:32 2025 +0200

    Fix AsyncWriteJournalResponseOrderSpec with global dispatcher (#2477)
    
    * Fix AsyncWriteJournalResponseOrderSpec with global dispatcher
    
    This fixes a random test failure when the global dispatcher used
    as a persistence plugin dispatcher.
    
    This change should unblock https://github.com/apache/pekko/pull/2434
    Also adjusted wording changes for write-response-global-order from 
https://github.com/apache/pekko/pull/2437
    
    The test relied on an ordering guarantee which wasn't really there.
    Even though asyncWriteMessages promises were completed in some particular 
order, AsyncWriteJournal sent out messages in an onComplete callback scheduled 
on the actor dispatcher, yielding random message order.
    The test failure shouldn't be a problem for production, non-test, Pekko 
code, because nothing there relies
    on write response ordering guarantees between independent persistent 
actors. And for an individual persistent actor writes
    - the persistent actor implementation internals handle ordering there 
correctly regardless of the plugin response ordering.
    
    * FIXUP
---
 persistence/src/main/resources/reference.conf      |   2 -
 .../AsyncWriteJournalResponseOrderSpec.scala       | 100 ++++++++++++---------
 2 files changed, 59 insertions(+), 43 deletions(-)

diff --git a/persistence/src/main/resources/reference.conf 
b/persistence/src/main/resources/reference.conf
index 8c7bbcd559..191c83dc89 100644
--- a/persistence/src/main/resources/reference.conf
+++ b/persistence/src/main/resources/reference.conf
@@ -164,8 +164,6 @@ pekko.persistence {
       #
       # When this setting is "off", the journal plugin is allowed to send back 
write responses
       # in any order. This can improve throughput and reduce latency under 
load.
-      # When using "off", changes to the dispatcher configuration of the 
journal plugin
-      # can lead to an increased number of out-of-order responses.
       #
       # The old behavior is still enabled by default ("on").
       write-response-global-order = on
diff --git 
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
 
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
index c7c9efa58e..3f3537c0ee 100644
--- 
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
+++ 
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
@@ -17,10 +17,11 @@
 
 package org.apache.pekko.persistence.journal
 
-import scala.collection.immutable
+import 
org.apache.pekko.persistence.journal.AsyncWriteJournalResponseOrderSpec._
+
+import scala.collection.{ immutable, mutable }
 import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.util.Try
-
 import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol, 
PersistenceSpec, PersistentRepr }
 import org.apache.pekko.testkit.ImplicitSender
 
@@ -34,90 +35,103 @@ class AsyncWriteJournalResponseOrderSpec
       PersistenceSpec.config(
         plugin = "", // we will provide explicit plugin IDs later
         test = classOf[AsyncWriteJournalResponseOrderSpec].getSimpleName,
+        // using the default system dispatcher to make sure 
write-response-global-order works with it
+        // see: https://github.com/apache/pekko/pull/2434
         extraConfig = Some(
           s"""
-          |pekko.persistence.journal.reverse-plugin {
+          |${ControlledWriteCompletionPlugin.BaseId} {
           |  with-global-order {
-          |    class = 
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
-          |    
+          |    class = "${classOf[ControlledWriteCompletionPlugin].getName}"
+          |    plugin-dispatcher = "pekko.actor.default-dispatcher"
           |    write-response-global-order = on
           |  }
           |  no-global-order {
-          |    class = 
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
-          |    
+          |    class = "${classOf[ControlledWriteCompletionPlugin].getName}"
+          |    plugin-dispatcher = "pekko.actor.default-dispatcher"
           |    write-response-global-order = off
           |  }
           |}
           |""".stripMargin
         ))) with ImplicitSender {
 
-  import AsyncWriteJournalResponseOrderSpec._
-
   "AsyncWriteJournal" must {
     "return write responses in request order if global response order is 
enabled" in {
       val pluginRef =
-        extension.journalFor(journalPluginId = 
"pekko.persistence.journal.reverse-plugin.with-global-order")
-
-      pluginRef ! mkWriteMessages(1)
-      pluginRef ! mkWriteMessages(2)
-      pluginRef ! mkWriteMessages(3)
-
-      pluginRef ! CompleteWriteOps
-
-      getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(1, 2, 3)
+        extension.journalFor(journalPluginId = 
s"${ControlledWriteCompletionPlugin.BaseId}.with-global-order")
+
+      // request writes for persistence Ids 1..9
+      1.to(9).foreach { persistenceId =>
+        pluginRef ! mkWriteMessages(persistenceId = persistenceId)
+      }
+      // complete writes for all but the first persistence ID in reverse order
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = 
2.to(9).toVector.reverse)
+      // AsyncWriteJournal should hold the responses yet to preserve the 
response order
+      expectNoMessage()
+      // complete write for the first persistence ID
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = Vector(1))
+      // now we should receive all responses in the order in which they have 
been requested
+      getPersistenceIdsFromResponses(receiveN(18)) shouldEqual 1.to(9).toVector
     }
 
-    "return write responses in completion order if global response order is 
disabled" in {
+    "return write responses as soon as the operation is complete if global 
response order is disabled" in {
       val pluginRef =
-        extension.journalFor(journalPluginId = 
"pekko.persistence.journal.reverse-plugin.no-global-order")
-
-      pluginRef ! mkWriteMessages(1)
-      pluginRef ! mkWriteMessages(2)
-      pluginRef ! mkWriteMessages(3)
-
-      pluginRef ! CompleteWriteOps
-
-      getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(3, 2, 1)
+        extension.journalFor(journalPluginId = 
s"${ControlledWriteCompletionPlugin.BaseId}.no-global-order")
+
+      // request writes for persistence Ids 1..9
+      1.to(9).foreach { persistenceId =>
+        pluginRef ! mkWriteMessages(persistenceId = persistenceId)
+      }
+      // complete writes for all but the first persistence ID in reverse order
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = 
2.to(9).toVector.reverse)
+      // AsyncWriteJournal sends out write responses for 2..9 right away 
without waiting
+      getPersistenceIdsFromResponses(receiveN(16)).toSet shouldEqual 
2.to(9).toSet
+      // complete write for the first persistence ID
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = Vector(1))
+      // and now we finally receive the response for persistence ID 1
+      getPersistenceIdsFromResponses(receiveN(2)) shouldEqual Vector(1)
     }
   }
 
-  private def mkWriteMessages(num: Int): JournalProtocol.WriteMessages = 
JournalProtocol.WriteMessages(
+  private def mkWriteMessages(persistenceId: Int): 
JournalProtocol.WriteMessages = JournalProtocol.WriteMessages(
     messages = Vector(AtomicWrite(PersistentRepr(
-      payload = num,
+      payload = "",
       sequenceNr = 0L,
-      persistenceId = num.toString
+      persistenceId = persistenceId.toString
     ))),
     persistentActor = self,
     actorInstanceId = 1
   )
 
-  private def getMessageNumsFromResponses(responses: Seq[AnyRef]): Vector[Int] 
= responses.collect {
+  private def getPersistenceIdsFromResponses(responses: Seq[AnyRef]): 
Vector[Int] = responses.collect {
     case successResponse: JournalProtocol.WriteMessageSuccess =>
-      successResponse.persistent.payload.asInstanceOf[Int]
+      successResponse.persistent.persistenceId.toInt
   }.toVector
 }
 
 private object AsyncWriteJournalResponseOrderSpec {
-  case object CompleteWriteOps
+  final case class CompleteWriteOps(persistenceIdsInOrder: Vector[Int])
 
   /**
-   * Accumulates asyncWriteMessages requests and completes them in reverse 
receive order on [[CompleteWriteOps]] command
+   * Accumulates asyncWriteMessages requests (one for each persistence ID is 
expected)
+   * and completes them in requested order on [[CompleteWriteOps]] command.
    */
-  class ReversePlugin extends AsyncWriteJournal {
+  final class ControlledWriteCompletionPlugin extends AsyncWriteJournal {
 
     private implicit val ec: ExecutionContext = context.dispatcher
 
-    private var pendingOps: Vector[Promise[Unit]] = Vector.empty
+    private val pendingOps: mutable.HashMap[Int, Promise[Unit]] = 
mutable.HashMap.empty
 
     override def receivePluginInternal: Receive = {
-      case CompleteWriteOps =>
-        pendingOps.reverse.foreach(_.success(()))
-        pendingOps = Vector.empty
+      case cmd: CompleteWriteOps =>
+        cmd.persistenceIdsInOrder.foreach { persistenceId =>
+          pendingOps(persistenceId).success(())
+          pendingOps -= persistenceId
+        }
     }
 
     override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): 
Future[immutable.Seq[Try[Unit]]] = {
       val responsePromise = Promise[Unit]()
-      pendingOps = pendingOps :+ responsePromise
+      pendingOps.put(messages.head.persistenceId.toInt, responsePromise)
       responsePromise.future.map(_ => Vector.empty)
     }
 
@@ -128,4 +142,8 @@ private object AsyncWriteJournalResponseOrderSpec {
 
     override def asyncReadHighestSequenceNr(persistenceId: String, 
fromSequenceNr: Long): Future[Long] = ???
   }
+
+  object ControlledWriteCompletionPlugin {
+    val BaseId: String = 
"pekko.persistence.journal.controlled-write-completion-plugin"
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to