This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new b95809da0a Fuse restInHandler into GraphStageLogic in Intersperse 
(#2981)
b95809da0a is described below

commit b95809da0a7ea923d95da175f0716cd336cb009d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 09:59:38 2026 +0800

    Fuse restInHandler into GraphStageLogic in Intersperse (#2981)
    
    * Fuse InHandler/OutHandler into GraphStageLogic to reduce allocations
    
    Optimize several stream stages by fusing separate InHandler/OutHandler
    objects into the GraphStageLogic itself, reducing per-materialization
    memory allocations:
    
    1. DoOnFirst: Replace separate InHandler with boolean flag pattern
    2. Intersperse: Merge two InHandler objects (start/rest) into single fused 
handler
    3. Reduce: Eliminate initial InHandler by using hasFirst flag
    4. DropWithin: Replace post-timeout InHandler with timedOut flag
    5. Scan: Remove initial InHandler/OutHandler pair using initialized flag
    6. ScanAsync: Remove ZeroHandler and post-completion OutHandler
    
    Tests:
    - stream-tests/testOnly FlowDoOnFirstSpec: 2/2 passed
    - stream-tests/testOnly FlowIntersperseSpec: 8/8 passed
    - stream-tests/testOnly FlowReduceSpec: 12/12 passed
    - stream-tests/testOnly FlowDropWithinSpec: 2/2 passed
    - stream-tests/testOnly FlowScanSpec: 8/8 passed
    - stream-tests/testOnly FlowScanAsyncSpec: 19/19 passed
    
    * Revert "Fuse InHandler/OutHandler into GraphStageLogic to reduce 
allocations"
    
    This reverts commit f9f03708b540cd9914018780447e2818434cc6c2.
    
    * Fuse restInHandler into GraphStageLogic in Intersperse
    
    Optimize Intersperse stage by fusing the hot-path restInHandler into
    GraphStageLogic itself, reducing per-materialization object allocation.
    
    Before: GraphStageLogic with OutHandler + startInHandler + restInHandler
    After:  GraphStageLogic with InHandler with OutHandler + startInHandler
    
    The cold-path startInHandler (only used once for first element) remains
    as a separate object. The hot-path restInHandler logic is now directly
    in the fused GraphStageLogic, eliminating one InHandler allocation per
    materialization without adding any boolean checks on the hot path.
    
    Tests:
    - stream-tests/testOnly FlowIntersperseSpec: 8/8 passed
---
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  | 37 +++++++++++-----------
 1 file changed, 19 insertions(+), 18 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index a0747c2b99..a16955d24a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -761,35 +761,36 @@ private[stream] object Collect {
   if (start.isDefined) 
ReactiveStreamsCompliance.requireNonNullElement(start.get)
   if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
 
-  override def createLogic(attr: Attributes): GraphStageLogic = new 
GraphStageLogic(shape) with OutHandler {
-    val startInHandler = new InHandler {
-      override def onPush(): Unit = {
-        // if else (to avoid using Iterator[T].flatten in hot code)
-        if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
-        else emit(out, grab(in))
-        setHandler(in, restInHandler) // switch handler
-      }
+  override def createLogic(attr: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler { self =>
+      // Cold path - only used once for the first element
+      val startInHandler = new InHandler {
+        override def onPush(): Unit = {
+          // if else (to avoid using Iterator[T].flatten in hot code)
+          if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
+          else emit(out, grab(in))
+          setHandler(in, self) // only switch in handler, don't touch out 
handler
+        }
 
-      override def onUpstreamFinish(): Unit = {
-        emitMultiple(out, Iterator(start, end).flatten)
-        completeStage()
+        override def onUpstreamFinish(): Unit = {
+          emitMultiple(out, Iterator(start, end).flatten)
+          completeStage()
+        }
       }
-    }
 
-    val restInHandler = new InHandler {
+      // Hot path - fused into GraphStageLogic, no separate object allocation
       override def onPush(): Unit = emitMultiple(out, Iterator(inject, 
grab(in)))
 
       override def onUpstreamFinish(): Unit = {
         if (end.isDefined) emit(out, end.get)
         completeStage()
       }
-    }
 
-    def onPull(): Unit = pull(in)
+      override def onPull(): Unit = pull(in)
 
-    setHandler(in, startInHandler)
-    setHandler(out, this)
-  }
+      setHandler(in, startInHandler)
+      setHandler(out, this)
+    }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to