This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new a5058437af optimize: skip afterStageHasRun no-op finalize check in 
GraphInterpreter chase hot path (#2986)
a5058437af is described below

commit a5058437af146430578a9c35c5e80f477f46692b
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 19:42:20 2026 +0800

    optimize: skip afterStageHasRun no-op finalize check in GraphInterpreter 
chase hot path (#2986)
    
    * test: stop leaking ActorSystem in InterpreterBenchmark per invocation
    
    Motivation:
    The previous shape `new GraphInterpreterSpecKit { new TestSetup { ... } }` 
ran
    inside @Benchmark, so each invocation built (and never tore down) a fresh
    ActorSystem. Long iterations exhausted native threads and JMH reported empty
    results once the JVM ran out of resources.
    
    Modification:
    Make the benchmark class itself extend GraphInterpreterSpecKit so JMH's
    @State(Scope.Benchmark) lifecycle reuses one ActorSystem across all
    invocations. Add @TearDown(Level.Trial) to terminate it cleanly.
    
    Result:
    The benchmark now runs to completion and produces stable numbers, which is a
    prerequisite for measuring follow-up GraphInterpreter optimizations.
    
    Tests:
    sbt 'bench-jmh/compile'
    
    * test: use per-instance IdentityStage in InterpreterBenchmark
    
    Motivation:
    GraphStages.identity is a singleton whose Inlet/Outlet shape is shared 
across
    every reference. Chaining N copies into the assembly (numberOfIds = 5/10)
    collapses to a single shape and mis-wires the connections, which surfaced 
as a
    runtime "Cannot pull port twice" error spam during the benchmark and 
produced
    nonsense throughput numbers (5/10 stages reported as faster than 1).
    
    Modification:
    Define a local IdentityStage class with its own Inlet/Outlet per instance 
and
    use Vector.fill(numberOfIds)(new IdentityStage[Int]).
    
    Result:
    The benchmark wires N distinct stages and produces stable, monotonic numbers
    (throughput decreases as numberOfIds grows, as expected).
    
    Tests:
    sbt 'bench-jmh/compile'
    
    * optimize: skip afterStageHasRun no-op finalize check in chase hot path
    
    Motivation:
    GraphInterpreter's chase loops dominate hot-path CPU in steady state — JMH
    stack profiling on InterpreterBenchmark attributes ~50% of stream-related
    samples to the two while loops at execute:449 / execute:460. Every chase
    iteration calls afterStageHasRun(activeStage), which in steady state always
    reads shutdownCounter(activeStage.stageId) and the per-stage finalized flag
    only to discover the stage has not just completed and skip the body. That is
    a per-event array load + null check + branch on the hottest path with no 
work
    to do, which the JIT cannot fold away because the array is mutable shared
    state.
    
    Modification:
    Track pendingFinalization: Boolean on the interpreter, set when a stage's
    shutdownCounter decrements to 0 in completeConnection or transitions to 0
    when KeepGoing is cleared in setKeepGoing. Gate the three hot-path
    afterStageHasRun calls in execute() (post normal-dispatch and the two chase
    loops) on the flag, resetting it before the call so cascaded completions
    during finalization re-arm the flag correctly. The slow-frequency callers
    (init, runAsyncInput) are left untouched.
    
    Result:
    JMH on InterpreterBenchmark (JDK 25, G1, single thread, -i 5 -wi 3 -f 1 -t 
1):
    
      numberOfIds   baseline (ops/ms)    with patch (ops/ms)    delta
      1             45238 ± 3143         50952 ± 4784           +12.6%
      5             10526 ±  151         11242 ±  288           +6.8%   (CIs 
disjoint)
      10             5350 ±  193          5927 ±  173           +10.8%  (CIs 
disjoint)
    
    Allocation rate stays at ~0.6 B/op — no GC impact. All stream-tests pass.
    
    Tests:
    - sbt 'stream/compile'
    - sbt 'stream/mimaReportBinaryIssues' - clean
    - sbt 'stream-tests/testOnly *fusing*' - 159 tests, all passed
    - sbt 'stream-tests/testOnly *Flow*Spec' - 1208 tests, all passed
    - sbt 'bench-jmh/Jmh/run -i 5 -wi 3 -f 1 -t 1 .*InterpreterBenchmark.*' - 
numbers above
    
    References:
    Refs #2985 - benchmark fix used to obtain trustworthy JMH numbers above.
---
 .../stream/impl/fusing/GraphInterpreter.scala      | 31 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
index b774cfb6ab..aedcd0b301 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
@@ -259,6 +259,10 @@ import pekko.stream.stage._
   private[this] var chaseCounter = 0 // the first events in preStart blocks 
should be not chased
   private[this] var chasedPush: Connection = NoEvent
   private[this] var chasedPull: Connection = NoEvent
+  // Set whenever a stage's shutdownCounter transitions to 0 (i.e. the stage 
just became completed and
+  // needs finalization). Lets the chase / dispatch loops skip the 
per-iteration shutdownCounter array
+  // load in afterStageHasRun when no stage has completed since the last 
finalization pass.
+  private[this] var pendingFinalization: Boolean = false
 
   private def queueStatus: String = {
     val contents = (queueHead until queueTail).map(idx => {
@@ -420,7 +424,10 @@ import pekko.stream.stage._
         catch {
           case NonFatal(e) => reportStageError(e)
         }
-        afterStageHasRun(activeStage)
+        if (pendingFinalization) {
+          pendingFinalization = false
+          afterStageHasRun(activeStage)
+        }
 
         /*
          * "Event chasing" optimization follows from here. This optimization 
works under the assumption that a Push or
@@ -453,7 +460,10 @@ import pekko.stream.stage._
           catch {
             case NonFatal(e) => reportStageError(e)
           }
-          afterStageHasRun(activeStage)
+          if (pendingFinalization) {
+            pendingFinalization = false
+            afterStageHasRun(activeStage)
+          }
         }
 
         // Chasing PULL events
@@ -464,7 +474,10 @@ import pekko.stream.stage._
           catch {
             case NonFatal(e) => reportStageError(e)
           }
-          afterStageHasRun(activeStage)
+          if (pendingFinalization) {
+            pendingFinalization = false
+            afterStageHasRun(activeStage)
+          }
         }
 
         if (chasedPush != NoEvent) {
@@ -627,12 +640,20 @@ import pekko.stream.stage._
   // itself might stop, too.
   private def completeConnection(stageId: Int): Unit = {
     val activeConnections = shutdownCounter(stageId)
-    if (activeConnections > 0) shutdownCounter(stageId) = activeConnections - 1
+    if (activeConnections > 0) {
+      val next = activeConnections - 1
+      shutdownCounter(stageId) = next
+      if (next == 0) pendingFinalization = true
+    }
   }
 
   private[stream] def setKeepGoing(logic: GraphStageLogic, enabled: Boolean): 
Unit =
     if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag
-    else shutdownCounter(logic.stageId) &= KeepGoingMask
+    else {
+      val next = shutdownCounter(logic.stageId) & KeepGoingMask
+      shutdownCounter(logic.stageId) = next
+      if (next == 0) pendingFinalization = true
+    }
 
   @InternalStableApi
   private[stream] def finalizeStage(logic: GraphStageLogic): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to