This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new a5058437af optimize: skip afterStageHasRun no-op finalize check in
GraphInterpreter chase hot path (#2986)
a5058437af is described below
commit a5058437af146430578a9c35c5e80f477f46692b
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 19:42:20 2026 +0800
optimize: skip afterStageHasRun no-op finalize check in GraphInterpreter
chase hot path (#2986)
* test: stop leaking ActorSystem in InterpreterBenchmark per invocation
Motivation:
The previous shape `new GraphInterpreterSpecKit { new TestSetup { ... } }`
ran
inside @Benchmark, so each invocation built (and never tore down) a fresh
ActorSystem. Long iterations exhausted native threads and JMH reported empty
results once the JVM ran out of resources.
Modification:
Make the benchmark class itself extend GraphInterpreterSpecKit so JMH's
@State(Scope.Benchmark) lifecycle reuses one ActorSystem across all
invocations. Add @TearDown(Level.Trial) to terminate it cleanly.
Result:
The benchmark now runs to completion and produces stable numbers, which is a
prerequisite for measuring follow-up GraphInterpreter optimizations.
Tests:
sbt 'bench-jmh/compile'
* test: use per-instance IdentityStage in InterpreterBenchmark
Motivation:
GraphStages.identity is a singleton whose Inlet/Outlet shape is shared
across
every reference. Chaining N copies into the assembly (numberOfIds = 5/10)
collapses to a single shape and mis-wires the connections, which surfaced
as a
runtime "Cannot pull port twice" error spam during the benchmark and
produced
nonsense throughput numbers (5/10 stages reported as faster than 1).
Modification:
Define a local IdentityStage class with its own Inlet/Outlet per instance
and
use Vector.fill(numberOfIds)(new IdentityStage[Int]).
Result:
The benchmark wires N distinct stages and produces stable, monotonic numbers
(throughput decreases as numberOfIds grows, as expected).
Tests:
sbt 'bench-jmh/compile'
* optimize: skip afterStageHasRun no-op finalize check in chase hot path
Motivation:
GraphInterpreter's chase loops dominate hot-path CPU in steady state — JMH
stack profiling on InterpreterBenchmark attributes ~50% of stream-related
samples to the two while loops at execute:449 / execute:460. Every chase
iteration calls afterStageHasRun(activeStage), which in steady state always
reads shutdownCounter(activeStage.stageId) and the per-stage finalized flag
only to discover the stage has not just completed and skip the body. That is
a per-event array load + null check + branch on the hottest path with no
work
to do, which the JIT cannot fold away because the array is mutable shared
state.
Modification:
Track pendingFinalization: Boolean on the interpreter, set when a stage's
shutdownCounter decrements to 0 in completeConnection or transitions to 0
when KeepGoing is cleared in setKeepGoing. Gate the three hot-path
afterStageHasRun calls in execute() (post normal-dispatch and the two chase
loops) on the flag, resetting it before the call so cascaded completions
during finalization re-arm the flag correctly. The slow-frequency callers
(init, runAsyncInput) are left untouched.
Result:
JMH on InterpreterBenchmark (JDK 25, G1, single thread, -i 5 -wi 3 -f 1 -t
1):
numberOfIds baseline (ops/ms) with patch (ops/ms) delta
1 45238 ± 3143 50952 ± 4784 +12.6%
5 10526 ± 151 11242 ± 288 +6.8% (CIs
disjoint)
10 5350 ± 193 5927 ± 173 +10.8% (CIs
disjoint)
Allocation rate stays at ~0.6 B/op — no GC impact. All stream-tests pass.
Tests:
- sbt 'stream/compile'
- sbt 'stream/mimaReportBinaryIssues' - clean
- sbt 'stream-tests/testOnly *fusing*' - 159 tests, all passed
- sbt 'stream-tests/testOnly *Flow*Spec' - 1208 tests, all passed
- sbt 'bench-jmh/Jmh/run -i 5 -wi 3 -f 1 -t 1 .*InterpreterBenchmark.*' -
numbers above
References:
Refs #2985 - benchmark fix used to obtain trustworthy JMH numbers above.
---
.../stream/impl/fusing/GraphInterpreter.scala | 31 ++++++++++++++++++----
1 file changed, 26 insertions(+), 5 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
index b774cfb6ab..aedcd0b301 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
@@ -259,6 +259,10 @@ import pekko.stream.stage._
private[this] var chaseCounter = 0 // the first events in preStart blocks
should be not chased
private[this] var chasedPush: Connection = NoEvent
private[this] var chasedPull: Connection = NoEvent
+ // Set whenever a stage's shutdownCounter transitions to 0 (i.e. the stage
just became completed and
+ // needs finalization). Lets the chase / dispatch loops skip the
per-iteration shutdownCounter array
+ // load in afterStageHasRun when no stage has completed since the last
finalization pass.
+ private[this] var pendingFinalization: Boolean = false
private def queueStatus: String = {
val contents = (queueHead until queueTail).map(idx => {
@@ -420,7 +424,10 @@ import pekko.stream.stage._
catch {
case NonFatal(e) => reportStageError(e)
}
- afterStageHasRun(activeStage)
+ if (pendingFinalization) {
+ pendingFinalization = false
+ afterStageHasRun(activeStage)
+ }
/*
* "Event chasing" optimization follows from here. This optimization
works under the assumption that a Push or
@@ -453,7 +460,10 @@ import pekko.stream.stage._
catch {
case NonFatal(e) => reportStageError(e)
}
- afterStageHasRun(activeStage)
+ if (pendingFinalization) {
+ pendingFinalization = false
+ afterStageHasRun(activeStage)
+ }
}
// Chasing PULL events
@@ -464,7 +474,10 @@ import pekko.stream.stage._
catch {
case NonFatal(e) => reportStageError(e)
}
- afterStageHasRun(activeStage)
+ if (pendingFinalization) {
+ pendingFinalization = false
+ afterStageHasRun(activeStage)
+ }
}
if (chasedPush != NoEvent) {
@@ -627,12 +640,20 @@ import pekko.stream.stage._
// itself might stop, too.
private def completeConnection(stageId: Int): Unit = {
val activeConnections = shutdownCounter(stageId)
- if (activeConnections > 0) shutdownCounter(stageId) = activeConnections - 1
+ if (activeConnections > 0) {
+ val next = activeConnections - 1
+ shutdownCounter(stageId) = next
+ if (next == 0) pendingFinalization = true
+ }
}
private[stream] def setKeepGoing(logic: GraphStageLogic, enabled: Boolean):
Unit =
if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag
- else shutdownCounter(logic.stageId) &= KeepGoingMask
+ else {
+ val next = shutdownCounter(logic.stageId) & KeepGoingMask
+ shutdownCounter(logic.stageId) = next
+ if (next == 0) pendingFinalization = true
+ }
@InternalStableApi
private[stream] def finalizeStage(logic: GraphStageLogic): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]