This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 956d478919 test: fix InterpreterBenchmark so it produces trustworthy 
numbers (#2985)
956d478919 is described below

commit 956d478919f7183fce7ca49f225934a9b6b443ad
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 17:14:39 2026 +0800

    test: fix InterpreterBenchmark so it produces trustworthy numbers (#2985)
    
    * test: stop leaking ActorSystem in InterpreterBenchmark per invocation
    
    Motivation:
    The previous shape `new GraphInterpreterSpecKit { new TestSetup { ... } }` 
ran
    inside @Benchmark, so each invocation built (and never tore down) a fresh
    ActorSystem. Long iterations exhausted native threads and JMH reported empty
    results once the JVM ran out of resources.
    
    Modification:
    Make the benchmark class itself extend GraphInterpreterSpecKit so JMH's
    @State(Scope.Benchmark) lifecycle reuses one ActorSystem across all
    invocations. Add @TearDown(Level.Trial) to terminate it cleanly.
    
    Result:
    The benchmark now runs to completion and produces stable numbers, which is a
    prerequisite for measuring follow-up GraphInterpreter optimizations.
    
    Tests:
    sbt 'bench-jmh/compile'
    
    * test: use per-instance IdentityStage in InterpreterBenchmark
    
    Motivation:
    GraphStages.identity is a singleton whose Inlet/Outlet shape is shared 
across
    every reference. Chaining N copies into the assembly (numberOfIds = 5/10)
    collapses to a single shape and mis-wires the connections, which surfaced 
as a
    runtime "Cannot pull port twice" error spam during the benchmark and 
produced
    nonsense throughput numbers (5/10 stages reported as faster than 1).
    
    Modification:
    Define a local IdentityStage class with its own Inlet/Outlet per instance 
and
    use Vector.fill(numberOfIds)(new IdentityStage[Int]).
    
    Result:
    The benchmark wires N distinct stages and produces stable, monotonic numbers
    (throughput decreases as numberOfIds grows, as expected).
    
    Tests:
    sbt 'bench-jmh/compile'
---
 .../apache/pekko/stream/InterpreterBenchmark.scala | 69 ++++++++++++++--------
 1 file changed, 45 insertions(+), 24 deletions(-)

diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
index 30724ab775..2045852a69 100644
--- 
a/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala
@@ -15,19 +15,20 @@ package org.apache.pekko.stream
 
 import java.util.concurrent.TimeUnit
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import org.openjdk.jmh.annotations._
 
 import org.apache.pekko
-import pekko.event._
 import pekko.stream.impl.fusing.GraphInterpreter.{ 
DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
 import pekko.stream.impl.fusing.GraphInterpreterSpecKit
-import pekko.stream.impl.fusing.GraphStages
 import pekko.stream.stage._
 
 @State(Scope.Benchmark)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 @BenchmarkMode(Array(Mode.Throughput))
-class InterpreterBenchmark {
+class InterpreterBenchmark extends GraphInterpreterSpecKit {
   import InterpreterBenchmark._
 
   // manual, and not via @Param, because we want @OperationsPerInvocation on 
our tests
@@ -36,32 +37,59 @@ class InterpreterBenchmark {
   @Param(Array("1", "5", "10"))
   var numberOfIds: Int = 0
 
+  // Earlier this benchmark instantiated `new GraphInterpreterSpecKit` inside 
@Benchmark, which
+  // created (and leaked) a fresh ActorSystem on every invocation and would 
exhaust native threads
+  // on long runs. Extending the SpecKit means JMH's @State(Scope.Benchmark) 
lifecycle reuses a
+  // single ActorSystem across all invocations.
+
+  @TearDown(Level.Trial)
+  def shutdown(): Unit = {
+    Await.result(system.terminate(), 10.seconds)
+  }
+
   @Benchmark
   @OperationsPerInvocation(100000)
   def graph_interpreter_100k_elements(): Unit = {
-    new GraphInterpreterSpecKit {
-      new TestSetup {
-        val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int])
-        val source = new GraphDataSource("source", data100k)
-        val sink = new GraphDataSink[Int]("sink", data100k.size)
-
-        val b = builder(identities: _*).connect(source, 
identities.head.in).connect(identities.last.out, sink)
+    new TestSetup {
+      val identities = Vector.fill(numberOfIds)(new IdentityStage[Int])
+      val source = new GraphDataSource("source", data100k)
+      val sink = new GraphDataSink[Int]("sink", data100k.size)
 
-        // FIXME: This should not be here, this is pure setup overhead
-        for (i <- 0 until identities.size - 1) {
-          b.connect(identities(i).out, identities(i + 1).in)
-        }
+      val b = builder(identities: _*).connect(source, 
identities.head.in).connect(identities.last.out, sink)
 
-        b.init()
-        sink.requestOne()
-        interpreter.execute(Int.MaxValue)
+      // FIXME: This should not be here, this is pure setup overhead
+      for (i <- 0 until identities.size - 1) {
+        b.connect(identities(i).out, identities(i + 1).in)
       }
+
+      b.init()
+      sink.requestOne()
+      interpreter.execute(Int.MaxValue)
     }
   }
 }
 
 object InterpreterBenchmark {
 
+  /**
+   * Per-instance identity stage. Cannot reuse [[GraphStages.identity]] 
because it is a singleton
+   * whose Inlet/Outlet shape is shared across all references — chaining N 
copies of the singleton
+   * collapses to a single shape and mis-wires the assembly (manifests as 
`Cannot pull port twice`).
+   */
+  final class IdentityStage[T] extends GraphStage[FlowShape[T, T]] {
+    val in = Inlet[T]("Identity.in")
+    val out = Outlet[T]("Identity.out")
+    override val shape: FlowShape[T, T] = FlowShape(in, out)
+
+    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
=
+      new GraphStageLogic(shape) with InHandler with OutHandler {
+        override def onPush(): Unit = push(out, grab(in))
+        override def onPull(): Unit = pull(in)
+        setHandler(in, this)
+        setHandler(out, this)
+      }
+  }
+
   case class GraphDataSource[T](override val toString: String, data: 
Vector[T]) extends UpstreamBoundaryStageLogic[T] {
     var idx: Int = 0
     override val out: pekko.stream.Outlet[T] = Outlet[T]("out")
@@ -98,11 +126,4 @@ object InterpreterBenchmark {
 
     def requestOne(): Unit = pull(in)
   }
-
-  val NoopBus = new LoggingBus {
-    override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = 
true
-    override def publish(event: Event): Unit = ()
-    override def unsubscribe(subscriber: Subscriber, from: Classifier): 
Boolean = true
-    override def unsubscribe(subscriber: Subscriber): Unit = ()
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to