This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new 5f7c791e12 Fix AsyncWriteJournalResponseOrderSpec with global 
dispatcher (#2477) (#2485)
5f7c791e12 is described below

commit 5f7c791e125ff4e2e5e7fe748a48507156e9298b
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Nov 12 23:16:13 2025 +0100

    Fix AsyncWriteJournalResponseOrderSpec with global dispatcher (#2477) 
(#2485)
    
    * Fix AsyncWriteJournalResponseOrderSpec with global dispatcher
    
    This fixes a random test failure when the global dispatcher used
    as a persistence plugin dispatcher.
    
    This change should unblock https://github.com/apache/pekko/pull/2434
    Also adjusted wording changes for write-response-global-order from 
https://github.com/apache/pekko/pull/2437
    
    The test relied on an ordering guarantee which wasn't really there.
    Even though asyncWriteMessages promises were completed in some particular 
order, AsyncWriteJournal sent out messages in an onComplete callback scheduled 
on the actor dispatcher, yielding random message order.
    The test failure shouldn't be a problem for production, non-test, Pekko 
code, because nothing there relies
    on write response ordering guarantees between independent persistent 
actors. And for an individual persistent actor writes
    - the persistent actor implementation internals handle ordering there 
correctly regardless of the plugin response ordering.
    
    * FIXUP
    
    Co-authored-by: Mikhail Sokolov <[email protected]>
---
 persistence/src/main/resources/reference.conf      |   2 -
 .../AsyncWriteJournalResponseOrderSpec.scala       | 102 ++++++++++++---------
 2 files changed, 60 insertions(+), 44 deletions(-)

diff --git a/persistence/src/main/resources/reference.conf 
b/persistence/src/main/resources/reference.conf
index 8c7bbcd559..191c83dc89 100644
--- a/persistence/src/main/resources/reference.conf
+++ b/persistence/src/main/resources/reference.conf
@@ -164,8 +164,6 @@ pekko.persistence {
       #
       # When this setting is "off", the journal plugin is allowed to send back 
write responses
       # in any order. This can improve throughput and reduce latency under 
load.
-      # When using "off", changes to the dispatcher configuration of the 
journal plugin
-      # can lead to an increased number of out-of-order responses.
       #
       # The old behavior is still enabled by default ("on").
       write-response-global-order = on
diff --git 
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
 
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
index a0faa633c0..3f3537c0ee 100644
--- 
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
+++ 
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
@@ -17,12 +17,13 @@
 
 package org.apache.pekko.persistence.journal
 
-import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol, 
PersistenceSpec, PersistentRepr }
-import org.apache.pekko.testkit.ImplicitSender
+import 
org.apache.pekko.persistence.journal.AsyncWriteJournalResponseOrderSpec._
 
-import scala.collection.immutable
+import scala.collection.{ immutable, mutable }
 import scala.concurrent.{ ExecutionContext, Future, Promise }
 import scala.util.Try
+import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol, 
PersistenceSpec, PersistentRepr }
+import org.apache.pekko.testkit.ImplicitSender
 
 /**
  * Verifies write response ordering logic for [[AsyncWriteJournal]].
@@ -34,90 +35,103 @@ class AsyncWriteJournalResponseOrderSpec
       PersistenceSpec.config(
         plugin = "", // we will provide explicit plugin IDs later
         test = classOf[AsyncWriteJournalResponseOrderSpec].getSimpleName,
+        // using the default system dispatcher to make sure 
write-response-global-order works with it
+        // see: https://github.com/apache/pekko/pull/2434
         extraConfig = Some(
           s"""
-          |pekko.persistence.journal.reverse-plugin {
+          |${ControlledWriteCompletionPlugin.BaseId} {
           |  with-global-order {
-          |    class = 
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
-          |    
+          |    class = "${classOf[ControlledWriteCompletionPlugin].getName}"
+          |    plugin-dispatcher = "pekko.actor.default-dispatcher"
           |    write-response-global-order = on
           |  }
           |  no-global-order {
-          |    class = 
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
-          |    
+          |    class = "${classOf[ControlledWriteCompletionPlugin].getName}"
+          |    plugin-dispatcher = "pekko.actor.default-dispatcher"
           |    write-response-global-order = off
           |  }
           |}
           |""".stripMargin
         ))) with ImplicitSender {
 
-  import AsyncWriteJournalResponseOrderSpec._
-
   "AsyncWriteJournal" must {
     "return write responses in request order if global response order is 
enabled" in {
       val pluginRef =
-        extension.journalFor(journalPluginId = 
"pekko.persistence.journal.reverse-plugin.with-global-order")
-
-      pluginRef ! mkWriteMessages(1)
-      pluginRef ! mkWriteMessages(2)
-      pluginRef ! mkWriteMessages(3)
-
-      pluginRef ! CompleteWriteOps
-
-      getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(1, 2, 3)
+        extension.journalFor(journalPluginId = 
s"${ControlledWriteCompletionPlugin.BaseId}.with-global-order")
+
+      // request writes for persistence Ids 1..9
+      1.to(9).foreach { persistenceId =>
+        pluginRef ! mkWriteMessages(persistenceId = persistenceId)
+      }
+      // complete writes for all but the first persistence ID in reverse order
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = 
2.to(9).toVector.reverse)
+      // AsyncWriteJournal should hold the responses yet to preserve the 
response order
+      expectNoMessage()
+      // complete write for the first persistence ID
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = Vector(1))
+      // now we should receive all responses in the order in which they have 
been requested
+      getPersistenceIdsFromResponses(receiveN(18)) shouldEqual 1.to(9).toVector
     }
 
-    "return write responses in completion order if global response order is 
disabled" in {
+    "return write responses as soon as the operation is complete if global 
response order is disabled" in {
       val pluginRef =
-        extension.journalFor(journalPluginId = 
"pekko.persistence.journal.reverse-plugin.no-global-order")
-
-      pluginRef ! mkWriteMessages(1)
-      pluginRef ! mkWriteMessages(2)
-      pluginRef ! mkWriteMessages(3)
-
-      pluginRef ! CompleteWriteOps
-
-      getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(3, 2, 1)
+        extension.journalFor(journalPluginId = 
s"${ControlledWriteCompletionPlugin.BaseId}.no-global-order")
+
+      // request writes for persistence Ids 1..9
+      1.to(9).foreach { persistenceId =>
+        pluginRef ! mkWriteMessages(persistenceId = persistenceId)
+      }
+      // complete writes for all but the first persistence ID in reverse order
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = 
2.to(9).toVector.reverse)
+      // AsyncWriteJournal sends out write responses for 2..9 right away 
without waiting
+      getPersistenceIdsFromResponses(receiveN(16)).toSet shouldEqual 
2.to(9).toSet
+      // complete write for the first persistence ID
+      pluginRef ! CompleteWriteOps(persistenceIdsInOrder = Vector(1))
+      // and now we finally receive the response for persistence ID 1
+      getPersistenceIdsFromResponses(receiveN(2)) shouldEqual Vector(1)
     }
   }
 
-  private def mkWriteMessages(num: Int): JournalProtocol.WriteMessages = 
JournalProtocol.WriteMessages(
+  private def mkWriteMessages(persistenceId: Int): 
JournalProtocol.WriteMessages = JournalProtocol.WriteMessages(
     messages = Vector(AtomicWrite(PersistentRepr(
-      payload = num,
+      payload = "",
       sequenceNr = 0L,
-      persistenceId = num.toString
+      persistenceId = persistenceId.toString
     ))),
     persistentActor = self,
     actorInstanceId = 1
   )
 
-  private def getMessageNumsFromResponses(responses: Seq[AnyRef]): Vector[Int] 
= responses.collect {
+  private def getPersistenceIdsFromResponses(responses: Seq[AnyRef]): 
Vector[Int] = responses.collect {
     case successResponse: JournalProtocol.WriteMessageSuccess =>
-      successResponse.persistent.payload.asInstanceOf[Int]
+      successResponse.persistent.persistenceId.toInt
   }.toVector
 }
 
 private object AsyncWriteJournalResponseOrderSpec {
-  case object CompleteWriteOps
+  final case class CompleteWriteOps(persistenceIdsInOrder: Vector[Int])
 
   /**
-   * Accumulates asyncWriteMessages requests and completes them in reverse 
receive order on [[CompleteWriteOps]] command
+   * Accumulates asyncWriteMessages requests (one for each persistence ID is 
expected)
+   * and completes them in requested order on [[CompleteWriteOps]] command.
    */
-  class ReversePlugin extends AsyncWriteJournal {
+  final class ControlledWriteCompletionPlugin extends AsyncWriteJournal {
 
     private implicit val ec: ExecutionContext = context.dispatcher
 
-    private var pendingOps: Vector[Promise[Unit]] = Vector.empty
+    private val pendingOps: mutable.HashMap[Int, Promise[Unit]] = 
mutable.HashMap.empty
 
     override def receivePluginInternal: Receive = {
-      case CompleteWriteOps =>
-        pendingOps.reverse.foreach(_.success(()))
-        pendingOps = Vector.empty
+      case cmd: CompleteWriteOps =>
+        cmd.persistenceIdsInOrder.foreach { persistenceId =>
+          pendingOps(persistenceId).success(())
+          pendingOps -= persistenceId
+        }
     }
 
     override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): 
Future[immutable.Seq[Try[Unit]]] = {
       val responsePromise = Promise[Unit]()
-      pendingOps = pendingOps :+ responsePromise
+      pendingOps.put(messages.head.persistenceId.toInt, responsePromise)
       responsePromise.future.map(_ => Vector.empty)
     }
 
@@ -128,4 +142,8 @@ private object AsyncWriteJournalResponseOrderSpec {
 
     override def asyncReadHighestSequenceNr(persistenceId: String, 
fromSequenceNr: Long): Future[Long] = ???
   }
+
+  object ControlledWriteCompletionPlugin {
+    val BaseId: String = 
"pekko.persistence.journal.controlled-write-completion-plugin"
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to