This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.6.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.6.x by this push:
new a1c929c0b4 actor-typed: Fix ProducerController crash on restart
(#2762) (#2853)
a1c929c0b4 is described below
commit a1c929c0b489c3fd0dd2b9dfaa65245ab9361bde
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Apr 11 11:31:27 2026 +0200
actor-typed: Fix ProducerController crash on restart (#2762) (#2853)
* Initial plan
* Fix ProducerController crash on restart with empty unconfirmed buffer
* Update ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
* Fix flaky test: dynamic seqNr, stop both actors, handle write-behind
redelivery
* Address reviewer concern: drain redelivery before awaiting RequestNext in
Phase 2
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../delivery/DurableProducerControllerSpec.scala | 33 ++++++++
.../delivery/internal/ProducerControllerImpl.scala | 6 +-
...DeliveryWithEventSourcedProducerQueueSpec.scala | 90 ++++++++++++++++++++++
3 files changed, 126 insertions(+), 3 deletions(-)
diff --git
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
index 9c8a2047e8..e2da511cfd 100644
---
a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
+++
b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/delivery/DurableProducerControllerSpec.scala
@@ -289,6 +289,39 @@ class DurableProducerControllerSpec
testKit.stop(producerController)
}
+
+ "resume correctly after restart with empty unconfirmed buffer" in {
+ nextId()
+ val consumerControllerProbe =
createTestProbe[ConsumerController.Command[TestConsumer.Job]]()
+
+ // Simulate a restart where all messages up to seqNr 4 were confirmed
before shutdown.
+ // The unconfirmed buffer is empty.
+ val durable = TestDurableProducerQueue[TestConsumer.Job](
+ Duration.Zero,
+ DurableProducerQueue.State(
+ currentSeqNr = 5,
+ highestConfirmedSeqNr = 4,
+ confirmedSeqNr = Map(NoQualifier -> (4L -> TestTimestamp)),
+ unconfirmed =
Vector.empty[DurableProducerQueue.MessageSent[TestConsumer.Job]]))
+
+ val producerController =
+ spawn(ProducerController[TestConsumer.Job](producerId, Some(durable)),
s"producerController-$idCount")
+ .unsafeUpcast[ProducerControllerImpl.InternalCommand]
+ val producerProbe =
createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]()
+ producerController ! ProducerController.Start(producerProbe.ref)
+ producerController !
ProducerController.RegisterConsumer(consumerControllerProbe.ref)
+
+ // With the fix: RequestNext should carry the restored seqNr (5), not 1.
+ val requestNext = producerProbe.receiveMessage()
+ requestNext.currentSeqNr should ===(5L)
+
+ // Sending a message must succeed without crashing and must be delivered
with seqNr 5.
+ requestNext.sendNextTo ! TestConsumer.Job("msg-5")
+ consumerControllerProbe.expectMessage(sequencedMessage(producerId, 5,
producerController).asFirst)
+
+ testKit.stop(producerController)
+ }
+
}
}
diff --git
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
index 2b6a9cea01..b733c4d765 100644
---
a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
+++
b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala
@@ -251,7 +251,7 @@ object ProducerControllerImpl {
requested = false,
currentSeqNr = loadedState.currentSeqNr,
confirmedSeqNr = loadedState.highestConfirmedSeqNr,
- requestedSeqNr = 1L,
+ requestedSeqNr = loadedState.currentSeqNr,
replyAfterStore = Map.empty,
supportResend = true,
unconfirmed = unconfirmed,
@@ -330,8 +330,8 @@ object ProducerControllerImpl {
val msgAdapter: ActorRef[A] = context.messageAdapter(msg => Msg(msg))
val requested =
if (state.unconfirmed.isEmpty) {
- flightRecorder.producerRequestNext(producerId, 1L, 0)
- state.producer ! RequestNext(producerId, 1L, 0L, msgAdapter,
context.self)
+ flightRecorder.producerRequestNext(producerId, state.currentSeqNr,
state.confirmedSeqNr)
+ state.producer ! RequestNext(producerId, state.currentSeqNr,
state.confirmedSeqNr, msgAdapter, context.self)
true
} else {
context.log.debug("Starting with [{}] unconfirmed.",
state.unconfirmed.size)
diff --git
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
index 221a2d537e..4b38ccd6f9 100644
---
a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
+++
b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
@@ -15,12 +15,16 @@ package org.apache.pekko.persistence.typed.delivery
import java.util.UUID
+import scala.concurrent.duration._
+import scala.util.Try
+
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.apache.pekko
import pekko.actor.testkit.typed.scaladsl._
+import pekko.actor.typed.ActorRef
import pekko.actor.typed.delivery.ConsumerController
import pekko.actor.typed.delivery.ProducerController
import pekko.persistence.typed.PersistenceId
@@ -177,6 +181,92 @@ class
ReliableDeliveryWithEventSourcedProducerQueueSpec(config: Config)
testKit.stop(consumerController)
}
+ "resume correctly after restart when all messages were confirmed" in {
+ val producerId = "p-restart-clean"
+ val producerProbe =
createTestProbe[ProducerController.RequestNext[String]]()
+ val consumerProbe =
createTestProbe[ConsumerController.Delivery[String]]()
+
+ // Phase 1: send one message and confirm it fully, then stop both
controllers cleanly.
+ val (pc1, cc1) = startProducerAndConsumer(producerId, producerProbe,
consumerProbe)
+ producerProbe.receiveMessage().sendNextTo ! "msg-1"
+ val del1 = consumerProbe.receiveMessage()
+ del1.confirmTo ! ConsumerController.Confirmed
+ // Wait for the ProducerController to process the confirmation and issue
a new RequestNext.
+ // The seqNr is captured dynamically: for non-chunked messages it will
be 2; for chunked
+ // messages each byte is a separate seqNr, so it will be higher (e.g. 6
for a 5-byte string).
+ // Note: StoreMessageConfirmed is intentionally write-behind
(fire-and-forget, no reply), so
+ // the confirmation may not yet be persisted to the journal when pc1 is
stopped below.
+ // If that happens, pc2 will re-deliver the earlier message — Phase 2
handles this case.
+ val nextSeqNr = producerProbe.receiveMessage().currentSeqNr
+ testKit.stop(pc1)
+ producerProbe.expectTerminated(pc1)
+ testKit.stop(cc1)
+ consumerProbe.expectTerminated(cc1)
+
+ // Phase 2: restart — must NOT crash, and must resume from the persisted
sequence number.
+ val (pc2, cc2) = startProducerAndConsumer(producerId, producerProbe,
consumerProbe)
+
+ // If StoreMessageConfirmed was not yet persisted when pc1 stopped, pc2
restarts with
+ // non-empty unconfirmed and re-delivers the earlier message(s) BEFORE
emitting RequestNext.
+ // Causal ordering: cc2 enqueues a Delivery to consumerProbe in the same
handler that
+ // it enqueues a Request to pc2; pc2 only emits RequestNext after
processing that Request.
+ // Therefore, if any redelivery is coming, it arrives at consumerProbe
before RequestNext
+ // reaches producerProbe. Draining it first (short timeout) prevents
producerProbe from
+ // having to wait for the full redelivery round-trip before RequestNext
arrives.
+ // The timeout only applies to the no-redelivery
(confirmation-persisted) case, where it
+ // adds a small, bounded delay rather than an indefinite block.
+ // 500 ms is generous for local actor messaging (typically < 10 ms) but
short enough
+ // that in the no-redelivery case the timeout does not slow down the
test noticeably.
+ Try(consumerProbe.receiveMessage(500.millis)).foreach { redelivery =>
+ // A delivery before we have sent msg-2 can only be a redelivery of
msg-1.
+ redelivery.message should ===("msg-1")
+ redelivery.confirmTo ! ConsumerController.Confirmed
+ }
+
+ // RequestNext now arrives quickly in both cases:
+ // - confirmed: pc2 emitted it immediately from becomeActive
(empty unconfirmed)
+ // - not confirmed: pc2 emitted it after cc2's initial Request
(triggered by the redelivery)
+ val req2 = producerProbe.receiveMessage()
+ // The bug caused requestedSeqNr to be hardcoded to 1 regardless of the
persisted state,
+ // so currentSeqNr > requestedSeqNr and the controller crashed on the
next message.
+ req2.currentSeqNr should ===(nextSeqNr)
+ req2.sendNextTo ! "msg-2"
+ // For the chunked-message case, pc2 may have re-delivered only the
first chunk via
+ // ResendFirst before RequestNext was sent. Sending msg-2 triggers a
Resend cycle for
+ // the remaining chunks, so the assembled msg-1 may arrive at
consumerProbe before msg-2.
+ val firstDelivery = consumerProbe.receiveMessage()
+ if (firstDelivery.message != "msg-2") {
+ firstDelivery.message should ===("msg-1") // sanity-check: only msg-1
can be redelivered
+ firstDelivery.confirmTo ! ConsumerController.Confirmed
+ consumerProbe.receiveMessage().message should ===("msg-2")
+ }
+ testKit.stop(pc2)
+ testKit.stop(cc2)
+ }
+
+ }
+
+ // Helper to start a ProducerController (with EventSourcedProducerQueue) and
a ConsumerController.
+ // Returns both refs so callers can stop them independently.
+ private def startProducerAndConsumer(
+ producerId: String,
+ producerProbe: TestProbe[ProducerController.RequestNext[String]],
+ consumerProbe: TestProbe[ConsumerController.Delivery[String]]
+ ): (ActorRef[ProducerController.Command[String]],
ActorRef[ConsumerController.Command[String]]) = {
+
+ val persistenceId = PersistenceId.ofUniqueId(producerId)
+ val durableQueue = EventSourcedProducerQueue[String](persistenceId)
+
+ val producerController = spawn(
+ ProducerController[String](producerId, Some(durableQueue)))
+
+ val consumerController = spawn(ConsumerController[String]())
+
+ producerController !
ProducerController.RegisterConsumer(consumerController)
+ producerController ! ProducerController.Start(producerProbe.ref)
+ consumerController ! ConsumerController.Start(consumerProbe.ref)
+
+ (producerController, consumerController)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]