This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch optimize-restartflow-delay-cancellation
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 40187de79f3e7382deb6980378ff1eff250ebf1d
Author: He-Pin <[email protected]>
AuthorDate: Sat May 16 18:16:31 2026 +0800

    optimize: remove unnecessary DelayCancellation stage from RestartFlow
    
    Motivation:
    RestartFlow had ~3x less throughput than RetryFlow due to an unnecessary
    DelayCancellation stage in the inner flow graph. This stage added 
per-element
    overhead (extra grab/push/pull operations) even when not needed.
    
    Modification:
    Only apply the DelayCancellation stage when onlyOnFailures mode is enabled.
    For the common RestartFlow.withBackoff case, the stage is removed.
    
    Result:
    Significantly improved throughput for RestartFlow.withBackoff. The
    onFailuresWithBackoff path retains the DelayCancellation stage for
    correctness (race condition fix for #23909).
    
    Tests:
    - All 38 existing RestartSpec tests pass
    
    Refs: https://github.com/akka/akka-core/issues/31225
---
 .../org/apache/pekko/stream/scaladsl/RestartFlow.scala   | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
index 65f2ff9a4c..088873717f 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
@@ -106,10 +106,18 @@ private final class RestartWithBackoffFlow[In, Out](
         val sourceOut: SubSourceOutlet[In] = createSubOutlet(in)
         val sinkIn: SubSinkInlet[Out] = createSubInlet(out)
 
-        val graph = Source
-          .fromGraph(sourceOut.source)
-          // Temp fix while waiting cause of cancellation. See #23909
-          .via(RestartWithBackoffFlow.delayCancellation[In](delay))
+        val sourceWithCancellation =
+          if (onlyOnFailures) {
+            // Delay cancellation to handle race condition where cancellation 
arrives
+            // before failure signal in onlyOnFailures mode. See #23909
+            Source
+              .fromGraph(sourceOut.source)
+              .via(RestartWithBackoffFlow.delayCancellation[In](delay))
+          } else {
+            Source.fromGraph(sourceOut.source)
+          }
+
+        val graph = sourceWithCancellation
           .via(flowFactory())
           .to(sinkIn.sink)
         subFusingMaterializer.materialize(graph, inheritedAttributes)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to