This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new b95809da0a Fuse restInHandler into GraphStageLogic in Intersperse
(#2981)
b95809da0a is described below
commit b95809da0a7ea923d95da175f0716cd336cb009d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 09:59:38 2026 +0800
Fuse restInHandler into GraphStageLogic in Intersperse (#2981)
* Fuse InHandler/OutHandler into GraphStageLogic to reduce allocations
Optimize several stream stages by fusing separate InHandler/OutHandler
objects into the GraphStageLogic itself, reducing per-materialization
memory allocations:
1. DoOnFirst: Replace separate InHandler with boolean flag pattern
2. Intersperse: Merge two InHandler objects (start/rest) into single fused
handler
3. Reduce: Eliminate initial InHandler by using hasFirst flag
4. DropWithin: Replace post-timeout InHandler with timedOut flag
5. Scan: Remove initial InHandler/OutHandler pair using initialized flag
6. ScanAsync: Remove ZeroHandler and post-completion OutHandler
Tests:
- stream-tests/testOnly FlowDoOnFirstSpec: 2/2 passed
- stream-tests/testOnly FlowIntersperseSpec: 8/8 passed
- stream-tests/testOnly FlowReduceSpec: 12/12 passed
- stream-tests/testOnly FlowDropWithinSpec: 2/2 passed
- stream-tests/testOnly FlowScanSpec: 8/8 passed
- stream-tests/testOnly FlowScanAsyncSpec: 19/19 passed
* Revert "Fuse InHandler/OutHandler into GraphStageLogic to reduce
allocations"
This reverts commit f9f03708b540cd9914018780447e2818434cc6c2.
* Fuse restInHandler into GraphStageLogic in Intersperse
Optimize Intersperse stage by fusing the hot-path restInHandler into
GraphStageLogic itself, reducing per-materialization object allocation.
Before: GraphStageLogic with OutHandler + startInHandler + restInHandler
After: GraphStageLogic with InHandler with OutHandler + startInHandler
The cold-path startInHandler (only used once for first element) remains
as a separate object. The hot-path restInHandler logic is now directly
in the fused GraphStageLogic, eliminating one InHandler allocation per
materialization without adding any boolean checks on the hot path.
Tests:
- stream-tests/testOnly FlowIntersperseSpec: 8/8 passed
---
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 37 +++++++++++-----------
1 file changed, 19 insertions(+), 18 deletions(-)
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index a0747c2b99..a16955d24a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -761,35 +761,36 @@ private[stream] object Collect {
if (start.isDefined)
ReactiveStreamsCompliance.requireNonNullElement(start.get)
if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
- override def createLogic(attr: Attributes): GraphStageLogic = new
GraphStageLogic(shape) with OutHandler {
- val startInHandler = new InHandler {
- override def onPush(): Unit = {
- // if else (to avoid using Iterator[T].flatten in hot code)
- if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
- else emit(out, grab(in))
- setHandler(in, restInHandler) // switch handler
- }
+ override def createLogic(attr: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler { self =>
+ // Cold path - only used once for the first element
+ val startInHandler = new InHandler {
+ override def onPush(): Unit = {
+ // if else (to avoid using Iterator[T].flatten in hot code)
+ if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
+ else emit(out, grab(in))
+ setHandler(in, self) // only switch in handler, don't touch out
handler
+ }
- override def onUpstreamFinish(): Unit = {
- emitMultiple(out, Iterator(start, end).flatten)
- completeStage()
+ override def onUpstreamFinish(): Unit = {
+ emitMultiple(out, Iterator(start, end).flatten)
+ completeStage()
+ }
}
- }
- val restInHandler = new InHandler {
+ // Hot path - fused into GraphStageLogic, no separate object allocation
override def onPush(): Unit = emitMultiple(out, Iterator(inject,
grab(in)))
override def onUpstreamFinish(): Unit = {
if (end.isDefined) emit(out, end.get)
completeStage()
}
- }
- def onPull(): Unit = pull(in)
+ override def onPull(): Unit = pull(in)
- setHandler(in, startInHandler)
- setHandler(out, this)
- }
+ setHandler(in, startInHandler)
+ setHandler(out, this)
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]