This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch optimize-internal-concat-value-presented
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit e506bfffcf6076af5c03a3fdc5738f1169cb4377
Author: He-Pin <[email protected]>
AuthorDate: Sun May 17 21:27:38 2026 +0800

    optimize: extend internalConcat dispatch for value-presented sources
    
    Motivation:
    `FlowOps#internalConcat` previously had only one fast-path: `SingleSource`
    on the right-hand side was rerouted through the lightweight `SingleConcat`
    stage instead of the general two-port `Concat[U](2, detached)` fan-in graph
    (which materializes the whole substream plus a detacher buffer). All other
    value-presented sources (`IterableSource`, `IteratorSource`, `RangeSource`,
    `RepeatSource`, `JavaStreamSource`, `FutureSource`, `FailedSource`) still
    took the heavy `concatGraph` path even though their data is already in
    memory or trivially producible — the fan-in machinery and substream
    materialization were pure overhead. Heavy `concat` users (pekko-http and
    others) carry that cost on every materialization.
    
    Modification:
    Add four small specialized `GraphStage[FlowShape[E, E]]` siblings of
    `SingleConcat`, each passing through elements while upstream is alive and
    draining its captured value-presented payload on `onUpstreamFinish`:
    
      - `IterableConcat[E](createIterator)` — emits via `emitMultiple`, covers
        `IterableSource`, `IteratorSource`, `RangeSource`, `JavaStreamSource`.
      - `RepeatConcat[E](elem)` — swaps `OutHandler` so each `onPull` pushes
        `elem`, covers `RepeatSource`.
      - `FailedConcat[E](failure)` — calls `failStage(failure)`, covers
        `FailedSource`.
      - `FutureConcat[E](future)` — emits/fails for completed futures, otherwise
        swaps `OutHandler` (to avoid pulling the now-closed `in` port) and
        registers an async callback that resolves once the future completes.
    
    `internalConcat` is extended to dispatch via
    `TraversalBuilder.getValuePresentedSource` and pattern-match the eight
    value-presented source types (existing `SingleSource` path is preserved).
    The `detached` flag is irrelevant for these stages — the right-hand data is
    already present, so the one-element pre-fetch buffer that `detached=true`
    provides has nothing to fetch (matching `SingleConcat`'s precedent).
    
    Result:
    For the eight value-presented source types, `concat` and `concatLazy` no
    longer pay for substream materialization or the two-port fan-in graph.
    Observable behavior is unchanged for all other sources, which still take the
    existing `concatGraph` path. Eleven directional tests added to
    `AbstractFlowConcatSpec` cover each new dispatch and assert (a) values
    delivered correctly and (b) zero substream materialization for value-
    presented sources. All `*FlowConcatSpec`, `*FlowConcatLazySpec`,
    `*FlowConcatAllSpec`, `*FlowConcatAllLazySpec`, and `*GraphConcatSpec` pass.
    MiMa is clean (all new stages are `private[pekko]` / `InternalApi`).
---
 .../pekko/stream/scaladsl/FlowConcatSpec.scala     | 112 +++++++++++++++++++++
 .../apache/pekko/stream/impl/FailedConcat.scala    |  52 ++++++++++
 .../apache/pekko/stream/impl/FutureConcat.scala    |  73 ++++++++++++++
 .../apache/pekko/stream/impl/IterableConcat.scala  |  53 ++++++++++
 .../apache/pekko/stream/impl/RepeatConcat.scala    |  59 +++++++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  32 +++++-
 6 files changed, 378 insertions(+), 3 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
index acae1ae812..147628bd52 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
@@ -13,11 +13,15 @@
 
 package org.apache.pekko.stream.scaladsl
 
+import java.util.Collections
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.Await
+import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
 
 import org.apache.pekko
 import pekko.NotUsed
@@ -247,6 +251,114 @@ abstract class AbstractFlowConcatSpec extends 
BaseTwoStreamsSetup {
 
       concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
     }
+
+    "optimize iterable concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source(List(2, 3, 4))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+    }
+
+    "optimize range concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source(2 to 4)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+    }
+
+    "optimize iterator concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source.fromIterator(() => Iterator(2, 3, 4))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+    }
+
+    "optimize java-stream concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source.fromJavaStream(() => Collections.singleton(2: 
Integer).stream()).map(_.intValue())
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      // map() is not value-presented, so the optimization should not kick in 
for s2 here.
+      // To exercise the optimization, build a JavaStream source whose 
value-presented form survives.
+      val s2Direct = Source.fromJavaStream(() => Collections.singleton(2: 
Integer).stream())
+      val concatDirect: Source[Integer, _] =
+        if (eager) Source.single[Integer](1).concat(s2Direct) else 
Source.single[Integer](1).concatLazy(s2Direct)
+      concatDirect.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+    }
+
+    "optimize repeat concat" in {
+      val s1 = Source(1 to 3)
+      val s2 = Source.repeat(0)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("RepeatConcat(0)")
+      concat.take(6).runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 0, 
0, 0))
+    }
+
+    "optimize failed concat" in {
+      val ex = new RuntimeException("boom") with NoStackTrace
+      val s1 = Source.single(1)
+      val s2: Source[Int, NotUsed] = Source.failed(ex)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("FailedConcat")
+      concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+    }
+
+    "optimize completed-future concat" in {
+      // `Source.future(Future.successful(x))` is itself optimized to a 
`SingleSource`,
+      // so the dispatch lands on `SingleConcat` rather than `FutureConcat`.
+      val s1 = Source.single(1)
+      val s2 = Source.future(Future.successful(2))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("SingleConcat(2)")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+    }
+
+    "optimize pending-future concat" in {
+      val promise = Promise[Int]()
+      val s1 = Source.single(1)
+      val s2 = Source.future(promise.future)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("FutureConcat")
+      val resultF = concat.runWith(Sink.seq)
+      promise.success(2)
+      resultF.futureValue should ===(Seq(1, 2))
+    }
+
+    "optimize failed-future concat" in {
+      // `Source.future(Future.failed(ex))` is itself optimized to a 
`FailedSource`,
+      // so the dispatch lands on `FailedConcat` rather than `FutureConcat`.
+      val ex = new RuntimeException("future-boom") with NoStackTrace
+      val s1 = Source.single(1)
+      val s2 = Source.future(Future.failed[Int](ex))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.traversalBuilder.pendingBuilder.toString should 
include("FailedConcat")
+      concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+    }
+
+    "avoid downstream substream materialization for value-presented sources" 
in {
+      // Wrap each emitted element through a counting map to check that no 
inner-source materialization fires.
+      // (For value-presented sources, the optimization avoids spinning up 
substreams.)
+      val materializationCounter = new AtomicInteger(0)
+      val s1 = Source.single(1).map { v => 
materializationCounter.incrementAndGet(); v }
+      val s2 = Source(2 to 4)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+      materializationCounter.get() should ===(1) // one for the upstream s1 
element only
+    }
   }
 }
 
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
new file mode 100644
index 0000000000..4aaa0ae6fd
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.failed` to a stream is common enough that it 
warrants this
+ * optimization which avoids the actual fan-out for such cases. After upstream
+ * finishes, the stage fails with the captured cause.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class FailedConcat[E](failure: Throwable) extends 
GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("FailedConcat.in")
+  val out = Outlet[E]("FailedConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit = failStage(failure)
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = s"FailedConcat($failure)"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
new file mode 100644
index 0000000000..59b1523e5e
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.future` to a stream is common enough that it 
warrants this
+ * optimization which avoids the actual fan-out for such cases. After upstream
+ * finishes, the stage emits the future's resolved value (or fails) without
+ * paying for substream materialization. Pending futures register an async 
callback
+ * that fires once the future completes.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class FutureConcat[E](future: Future[E]) extends 
GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("FutureConcat.in")
+  val out = Outlet[E]("FutureConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit =
+        future.value match {
+          case Some(completed) => handle(completed)
+          case None            =>
+            // Avoid pulling the now-closed `in` while the future is pending.
+            setHandler(out, new OutHandler {
+              override def onPull(): Unit = () // wait for the async callback
+            })
+            val cb = getAsyncCallback[Try[E]](handle).invoke _
+            future.onComplete(cb)(ExecutionContext.parasitic)
+        }
+
+      private def handle(result: Try[E]): Unit = result match {
+        case Success(null) => completeStage()
+        case Success(v)    => emit(out, v, () => completeStage())
+        case Failure(ex)   => failStage(ex)
+      }
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = "FutureConcat"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
new file mode 100644
index 0000000000..013bda996c
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating an iterable / iterator / range / java-stream-backed source to 
a stream
+ * is common enough that it warrants this optimization which avoids the actual 
fan-out
+ * for value-presented sources.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class IterableConcat[E](createIterator: () => 
Iterator[E]) extends GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("IterableConcat.in")
+  val out = Outlet[E]("IterableConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit =
+        emitMultiple(out, createIterator(), () => completeStage())
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = "IterableConcat"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
new file mode 100644
index 0000000000..2ea912e0a1
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.repeat` to a stream is common enough that it 
warrants this
+ * optimization which avoids the actual fan-out for such cases. After upstream
+ * finishes, the stage indefinitely emits the cached element on every pull 
until
+ * downstream cancels.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class RepeatConcat[E](elem: E) extends 
GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("RepeatConcat.in")
+  val out = Outlet[E]("RepeatConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit = {
+        setHandler(out,
+          new OutHandler {
+            override def onPull(): Unit = push(out, elem)
+          })
+        if (isAvailable(out)) push(out, elem)
+      }
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = s"RepeatConcat($elem)"
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 6e498d1ca5..c1607f71cf 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -32,8 +32,14 @@ import pekko.event.LoggingAdapter
 import pekko.event.MarkerLoggingAdapter
 import pekko.stream._
 import pekko.stream.Attributes.SourceLocation
+import pekko.stream.impl.FailedConcat
+import pekko.stream.impl.FailedSource
+import pekko.stream.impl.FutureConcat
+import pekko.stream.impl.IterableConcat
+import pekko.stream.impl.JavaStreamSource
 import pekko.stream.impl.LinearTraversalBuilder
 import pekko.stream.impl.ProcessorModule
+import pekko.stream.impl.RepeatConcat
 import pekko.stream.impl.SetupFlowStage
 import pekko.stream.impl.SingleConcat
 import pekko.stream.impl.Stages.DefaultAttributes
@@ -44,6 +50,7 @@ import pekko.stream.impl.TraversalBuilder
 import pekko.stream.impl.fusing
 import pekko.stream.impl.fusing._
 import pekko.stream.impl.fusing.FlattenMerge
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
 import pekko.stream.stage._
 import pekko.util.ConstantFun
 import pekko.util.OptionVal
@@ -3808,9 +3815,28 @@ trait FlowOps[+Out, +Mat] {
     that match {
       case source if TraversalBuilder.isEmptySource(source) => 
this.asInstanceOf[Repr[U]]
       case other                                            =>
-        TraversalBuilder.getSingleSource(other) match {
-          case OptionVal.Some(singleSource) =>
-            via(new SingleConcat(singleSource.elem.asInstanceOf[U]))
+        TraversalBuilder.getValuePresentedSource(other) match {
+          case OptionVal.Some(graph) =>
+            graph match {
+              case single: SingleSource[U] @unchecked =>
+                via(new SingleConcat(single.elem))
+              case iterable: IterableSource[U] @unchecked =>
+                via(new IterableConcat[U](() => iterable.elements.iterator))
+              case iterator: IteratorSource[U] @unchecked =>
+                via(new IterableConcat[U](iterator.createIterator))
+              case range: RangeSource[U] @unchecked =>
+                via(new IterableConcat[U](() => 
range.range.iterator.asInstanceOf[Iterator[U]]))
+              case javaStream: JavaStreamSource[U, _] @unchecked =>
+                import scala.jdk.CollectionConverters._
+                via(new IterableConcat[U](() => 
javaStream.open().iterator.asScala))
+              case repeat: RepeatSource[U] @unchecked =>
+                via(new RepeatConcat[U](repeat.elem))
+              case futureSource: FutureSource[U] @unchecked =>
+                via(new FutureConcat[U](futureSource.future))
+              case failed: FailedSource[U] @unchecked =>
+                via(new FailedConcat[U](failed.failure))
+              case _ => via(concatGraph(other, detached))
+            }
           case _ => via(concatGraph(other, detached))
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to