This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.6.x by this push:
     new a1c929c0b4 actor-typed: Fix ProducerController crash on restart 
(#2762) (#2853)
a1c929c0b4 is described below

commit a1c929c0b489c3fd0dd2b9dfaa65245ab9361bde
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Apr 11 11:31:27 2026 +0200

    actor-typed: Fix ProducerController crash on restart (#2762) (#2853)
    
    * Initial plan
    
    * Fix ProducerController crash on restart with empty unconfirmed buffer
    
    
    
    * Update ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
    
    * Fix flaky test: dynamic seqNr, stop both actors, handle write-behind 
redelivery
    
    
    
    * Address reviewer concern: drain redelivery before awaiting RequestNext in 
Phase 2
    
    
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../delivery/DurableProducerControllerSpec.scala   | 33 ++++++++
 .../delivery/internal/ProducerControllerImpl.scala |  6 +-
 ...DeliveryWithEventSourcedProducerQueueSpec.scala | 90 ++++++++++++++++++++++
 3 files changed, 126 insertions(+), 3 deletions(-)

diff --git 
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
 
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
index 9c8a2047e8..e2da511cfd 100644
--- 
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
+++ 
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
@@ -289,6 +289,39 @@ class DurableProducerControllerSpec
 
       testKit.stop(producerController)
     }
+
+    "resume correctly after restart with empty unconfirmed buffer" in {
+      nextId()
+      val consumerControllerProbe = 
createTestProbe[ConsumerController.Command[TestConsumer.Job]]()
+
+      // Simulate a restart where all messages up to seqNr 4 were confirmed 
before shutdown.
+      // The unconfirmed buffer is empty.
+      val durable = TestDurableProducerQueue[TestConsumer.Job](
+        Duration.Zero,
+        DurableProducerQueue.State(
+          currentSeqNr = 5,
+          highestConfirmedSeqNr = 4,
+          confirmedSeqNr = Map(NoQualifier -> (4L -> TestTimestamp)),
+          unconfirmed = 
Vector.empty[DurableProducerQueue.MessageSent[TestConsumer.Job]]))
+
+      val producerController =
+        spawn(ProducerController[TestConsumer.Job](producerId, Some(durable)), 
s"producerController-$idCount")
+          .unsafeUpcast[ProducerControllerImpl.InternalCommand]
+      val producerProbe = 
createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]()
+      producerController ! ProducerController.Start(producerProbe.ref)
+      producerController ! 
ProducerController.RegisterConsumer(consumerControllerProbe.ref)
+
+      // With the fix: RequestNext should carry the restored seqNr (5), not 1.
+      val requestNext = producerProbe.receiveMessage()
+      requestNext.currentSeqNr should ===(5L)
+
+      // Sending a message must succeed without crashing and must be delivered 
with seqNr 5.
+      requestNext.sendNextTo ! TestConsumer.Job("msg-5")
+      consumerControllerProbe.expectMessage(sequencedMessage(producerId, 5, 
producerController).asFirst)
+
+      testKit.stop(producerController)
+    }
+
   }
 
 }
diff --git 
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
 
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
index 2b6a9cea01..b733c4d765 100644
--- 
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
+++ 
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
@@ -251,7 +251,7 @@ object ProducerControllerImpl {
       requested = false,
       currentSeqNr = loadedState.currentSeqNr,
       confirmedSeqNr = loadedState.highestConfirmedSeqNr,
-      requestedSeqNr = 1L,
+      requestedSeqNr = loadedState.currentSeqNr,
       replyAfterStore = Map.empty,
       supportResend = true,
       unconfirmed = unconfirmed,
@@ -330,8 +330,8 @@ object ProducerControllerImpl {
         val msgAdapter: ActorRef[A] = context.messageAdapter(msg => Msg(msg))
         val requested =
           if (state.unconfirmed.isEmpty) {
-            flightRecorder.producerRequestNext(producerId, 1L, 0)
-            state.producer ! RequestNext(producerId, 1L, 0L, msgAdapter, 
context.self)
+            flightRecorder.producerRequestNext(producerId, state.currentSeqNr, 
state.confirmedSeqNr)
+            state.producer ! RequestNext(producerId, state.currentSeqNr, 
state.confirmedSeqNr, msgAdapter, context.self)
             true
           } else {
             context.log.debug("Starting with [{}] unconfirmed.", 
state.unconfirmed.size)
diff --git 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
index 221a2d537e..4b38ccd6f9 100644
--- 
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
+++ 
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
@@ -15,12 +15,16 @@ package org.apache.pekko.persistence.typed.delivery
 
 import java.util.UUID
 
+import scala.concurrent.duration._
+import scala.util.Try
+
 import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 import org.scalatest.wordspec.AnyWordSpecLike
 
 import org.apache.pekko
 import pekko.actor.testkit.typed.scaladsl._
+import pekko.actor.typed.ActorRef
 import pekko.actor.typed.delivery.ConsumerController
 import pekko.actor.typed.delivery.ProducerController
 import pekko.persistence.typed.PersistenceId
@@ -177,6 +181,92 @@ class 
ReliableDeliveryWithEventSourcedProducerQueueSpec(config: Config)
       testKit.stop(consumerController)
     }
 
+    "resume correctly after restart when all messages were confirmed" in {
+      val producerId = "p-restart-clean"
+      val producerProbe = 
createTestProbe[ProducerController.RequestNext[String]]()
+      val consumerProbe = 
createTestProbe[ConsumerController.Delivery[String]]()
+
+      // Phase 1: send one message and confirm it fully, then stop both 
controllers cleanly.
+      val (pc1, cc1) = startProducerAndConsumer(producerId, producerProbe, 
consumerProbe)
+      producerProbe.receiveMessage().sendNextTo ! "msg-1"
+      val del1 = consumerProbe.receiveMessage()
+      del1.confirmTo ! ConsumerController.Confirmed
+      // Wait for the ProducerController to process the confirmation and issue 
a new RequestNext.
+      // The seqNr is captured dynamically: for non-chunked messages it will 
be 2; for chunked
+      // messages each byte is a separate seqNr, so it will be higher (e.g. 6 
for a 5-byte string).
+      // Note: StoreMessageConfirmed is intentionally write-behind 
(fire-and-forget, no reply), so
+      // the confirmation may not yet be persisted to the journal when pc1 is 
stopped below.
+      // If that happens, pc2 will re-deliver the earlier message — Phase 2 
handles this case.
+      val nextSeqNr = producerProbe.receiveMessage().currentSeqNr
+      testKit.stop(pc1)
+      producerProbe.expectTerminated(pc1)
+      testKit.stop(cc1)
+      consumerProbe.expectTerminated(cc1)
+
+      // Phase 2: restart — must NOT crash, and must resume from the persisted 
sequence number.
+      val (pc2, cc2) = startProducerAndConsumer(producerId, producerProbe, 
consumerProbe)
+
+      // If StoreMessageConfirmed was not yet persisted when pc1 stopped, pc2 
restarts with
+      // non-empty unconfirmed and re-delivers the earlier message(s) BEFORE 
emitting RequestNext.
+      // Causal ordering: cc2 enqueues a Delivery to consumerProbe in the same 
handler that
+      // it enqueues a Request to pc2; pc2 only emits RequestNext after 
processing that Request.
+      // Therefore, if any redelivery is coming, it arrives at consumerProbe 
before RequestNext
+      // reaches producerProbe. Draining it first (short timeout) prevents 
producerProbe from
+      // having to wait for the full redelivery round-trip before RequestNext 
arrives.
+      // The timeout only applies to the no-redelivery 
(confirmation-persisted) case, where it
+      // adds a small, bounded delay rather than an indefinite block.
+      // 500 ms is generous for local actor messaging (typically < 10 ms) but 
short enough
+      // that in the no-redelivery case the timeout does not slow down the 
test noticeably.
+      Try(consumerProbe.receiveMessage(500.millis)).foreach { redelivery =>
+        // A delivery before we have sent msg-2 can only be a redelivery of 
msg-1.
+        redelivery.message should ===("msg-1")
+        redelivery.confirmTo ! ConsumerController.Confirmed
+      }
+
+      // RequestNext now arrives quickly in both cases:
+      //   - confirmed:     pc2 emitted it immediately from becomeActive 
(empty unconfirmed)
+      //   - not confirmed: pc2 emitted it after cc2's initial Request 
(triggered by the redelivery)
+      val req2 = producerProbe.receiveMessage()
+      // The bug caused requestedSeqNr to be hardcoded to 1 regardless of the 
persisted state,
+      // so currentSeqNr > requestedSeqNr and the controller crashed on the 
next message.
+      req2.currentSeqNr should ===(nextSeqNr)
+      req2.sendNextTo ! "msg-2"
+      // For the chunked-message case, pc2 may have re-delivered only the 
first chunk via
+      // ResendFirst before RequestNext was sent. Sending msg-2 triggers a 
Resend cycle for
+      // the remaining chunks, so the assembled msg-1 may arrive at 
consumerProbe before msg-2.
+      val firstDelivery = consumerProbe.receiveMessage()
+      if (firstDelivery.message != "msg-2") {
+        firstDelivery.message should ===("msg-1") // sanity-check: only msg-1 
can be redelivered
+        firstDelivery.confirmTo ! ConsumerController.Confirmed
+        consumerProbe.receiveMessage().message should ===("msg-2")
+      }
+      testKit.stop(pc2)
+      testKit.stop(cc2)
+    }
+
+  }
+
+  // Helper to start a ProducerController (with EventSourcedProducerQueue) and 
a ConsumerController.
+  // Returns both refs so callers can stop them independently.
+  private def startProducerAndConsumer(
+      producerId: String,
+      producerProbe: TestProbe[ProducerController.RequestNext[String]],
+      consumerProbe: TestProbe[ConsumerController.Delivery[String]]
+  ): (ActorRef[ProducerController.Command[String]], 
ActorRef[ConsumerController.Command[String]]) = {
+
+    val persistenceId = PersistenceId.ofUniqueId(producerId)
+    val durableQueue = EventSourcedProducerQueue[String](persistenceId)
+
+    val producerController = spawn(
+      ProducerController[String](producerId, Some(durableQueue)))
+
+    val consumerController = spawn(ConsumerController[String]())
+
+    producerController ! 
ProducerController.RegisterConsumer(consumerController)
+    producerController ! ProducerController.Start(producerProbe.ref)
+    consumerController ! ConsumerController.Start(consumerProbe.ref)
+
+    (producerController, consumerController)
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to