This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 415c4ae760 optimize: extend internalConcat dispatch for 
value-presented sources (#2978)
415c4ae760 is described below

commit 415c4ae76024b2e4d1873efc9c028c9ea838757a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 07:02:43 2026 +0800

    optimize: extend internalConcat dispatch for value-presented sources (#2978)
    
    * optimize: extend internalConcat dispatch for value-presented sources
    
    Motivation:
    PR #2977 added value-presented source dispatch for FlattenConcat /
    FlattenMerge so that materializing a substream is skipped when the inner
    source already carries its element(s) inline. The same opportunity exists
    for the lazy `concat` operator, where the second source is often a
    `Source.single`, `Source.future`, an iterable, a range, a Java stream,
    `Source.repeat`, or a failed source. Materializing a fan-in graph for any
    of these is pure overhead.
    
    `concat` (the eager / detached variant) intentionally retains the
    substream-materializing path so that `Concat(_, detachedInputs = true)`
    + `Detacher` semantics — eager pull at materialization, one-element
    prefetch buffer, deadlock-breaking for cyclic graphs — are preserved.
    
    Modification:
    - `Flow.internalConcat` (Scala DSL): when `detached = false` (i.e.
      `concatLazy`) and `TraversalBuilder.getValuePresentedSource` recognises
      a known kind, fuse with a dedicated lightweight stage instead of
      `Concat`. When `detached = true` (i.e. `concat`), fall through to
      `concatGraph` to keep the eager-pull / prefetch / cycle-deadlock
      contract intact. New stages live under `pekko.stream.impl`:
      `SingleConcat`, `IterableConcat`, `JavaStreamConcat`, `RepeatConcat`,
      `FailedConcat`, `FutureConcat`. `Source.empty` short-circuits to the
      upstream Source unchanged. The inlined source's wrapping attributes
      (from `Source.foo(...).withAttributes(...)`) are carried over to the
      optimized stage via `addAttributes(other.traversalBuilder.attributes)`
      so any user-supplied `SupervisionStrategy`, dispatcher hint, log level,
      or stage name still applies on the fast path.
    - `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
      `FailedSource.preStart` semantics. The existing `concatGraph` path
      materialises `FailedSource` as a substream that fails immediately and
      `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
      eager-failure timing avoids hangs on inputs that never complete (e.g.
      `Source.never.concatLazy(Source.failed(ex))`).
    - `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
      match `JavaStreamSource.preStart`. Side effects of `open()` (file /
      network resource acquisition) and any exceptions from `open()` happen
      at materialization time, matching the existing `concatGraph` path. The
      stream is closed in `postStop` so cancellation, exhaustion, iterator
      failure, and stage failure all release the resource (`stream` typed
      `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
      #2977's fix in `5d03002142`).
    - `FutureConcat` registers the async callback in `preStart`, mirroring
      `FutureSource.preStart`: a pending-future failure surfaces eagerly via
      `failStage(ex)` even while upstream is still active (otherwise
      `Source.never.concatLazy(Source.future(failingFuture))` would hang). A
      successful value is buffered in `futureResult` and emitted only after
      `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
      for the already-completed fast path. The pending path swaps the `out`
      handler to a no-op while waiting for the callback so we don't pull the
      now-closed `in`. `Success(null)` calls `completeStage()` (no null
      forwarded), staying in lock-step with 
`Source.future(Future.successful(null))`
      rerouting to `Source.empty` and `InflightSources.hasFutureElement`
      treating `Success(null)` as no-element. `DO NOT CHANGE` comments
      capture both invariants.
    - `IterableConcat` reads 
`inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider`
      and applies it on iterator-creation / iterator-iteration failure,
      mirroring `IterableSource` / `IteratorSource` semantics. Without this,
      a Resume / Restart decider attached to the inlined source via
      `withAttributes(supervisionStrategy(...))` would silently degrade to
      Stop on the optimized path. Restart re-invokes the iterator factory and
      resumes emission with a fresh iterator.
    - Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
      `IterableConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`,
      `FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on
      both exhaustion and downstream cancellation (`take(2)` mid-stream),
      pending future resolved with `Success(null)` treated as completion,
      pending future resolved with `Failure(_)` failing the stream, eager
      failure for `Source.never.concatLazy(Source.failed(ex))`, eager failure
      for `Source.never.concatLazy(Source.future(pendingPromise))` when the
      promise later fails, supervision parity for the `IterableConcat` fast
      path (Resume-skips, Restart-recreates, default-Stop-fails), and the
      detached-gating: a directional test asserts the fast path is not taken
      for `concat` (detached = true) and another asserts that `concat`
      preserves Detacher's eager pull side-effect timing for an
      `IteratorSource` factory while `concatLazy` defers it. Existing eager
      / concatLazy parity matrix is preserved by gating each
      `pendingBuilder.toString` assertion on `!eager`.
    
    Result:
    The lazy concat operator takes the same value-presented fast path as
    #2977's flatten operators while the eager / detached `concat` keeps its
    documented Detacher semantics. No substream is materialized for the
    inlined kinds on the lazy path; the Java-stream resource is closed
    deterministically; the failed / future eager-failure timing matches the
    existing concat-graph path so inputs that never complete cannot hang;
    the pending-future null path is consistent with `Source.future` and the
    flatten inflight path; supervision deciders attached to inlined
    iterables / iterators flow through to the optimized stage; and the
    detached-true contract is preserved bit-for-bit by falling through to
    `concatGraph`. All FlowConcat suites pass, plus all FlattenMerge /
    FlatMapConcat regression tests.
    
    Tests: stream-tests/testOnly *FlowConcat*Spec *Concat* *FlattenMerge* 
*FlatMapConcat*
    References: PR #2977 (value-presented optimization for FlattenConcat /
    FlattenMerge); PR #2978 (this work)
    
    * .
---
 .../pekko/stream/scaladsl/FlowConcatSpec.scala     | 309 ++++++++++++++++++++-
 .../apache/pekko/stream/impl/FailedConcat.scala    |  60 ++++
 .../apache/pekko/stream/impl/FutureConcat.scala    | 100 +++++++
 .../apache/pekko/stream/impl/IterableConcat.scala  | 108 +++++++
 .../pekko/stream/impl/JavaStreamConcat.scala       |  80 ++++++
 .../apache/pekko/stream/impl/RepeatConcat.scala    |  59 ++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    |  54 +++-
 7 files changed, 763 insertions(+), 7 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
index acae1ae812..2b2deb052c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
@@ -13,14 +13,21 @@
 
 package org.apache.pekko.stream.scaladsl
 
+import java.util.Collections
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.Await
+import scala.concurrent.Future
 import scala.concurrent.Promise
 import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
 
 import org.apache.pekko
 import pekko.NotUsed
+import pekko.stream.ActorAttributes.supervisionStrategy
+import pekko.stream.KillSwitches
+import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
 import pekko.stream.testkit.BaseTwoStreamsSetup
 import pekko.stream.testkit.TestPublisher
 import pekko.stream.testkit.TestSubscriber
@@ -242,11 +249,309 @@ abstract class AbstractFlowConcatSpec extends 
BaseTwoStreamsSetup {
       val s2 = Source.single(2)
       val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
 
-      // avoids digging too deap into the traversal builder
-      concat.traversalBuilder.pendingBuilder.toString should 
include("SingleConcat(2)")
+      // avoids digging too deep into the traversal builder; fast path is 
gated on detached=false.
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("SingleConcat(2)")
 
       concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
     }
+
+    "optimize iterable concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source(List(2, 3, 4))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+    }
+
+    "optimize range concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source(2 to 4)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+    }
+
+    "optimize iterator concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source.fromIterator(() => Iterator(2, 3, 4))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+    }
+
+    "optimize java-stream concat" in {
+      val s1 = Source.single(1)
+      val s2 = Source.fromJavaStream(() => Collections.singleton(2: 
Integer).stream()).map(_.intValue())
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      // map() is not value-presented, so the optimization should not kick in 
for s2 here.
+      // To exercise the optimization, build a JavaStream source whose 
value-presented form survives.
+      val s2Direct = Source.fromJavaStream(() => Collections.singleton(2: 
Integer).stream())
+      val concatDirect: Source[Integer, _] =
+        if (eager) Source.single[Integer](1).concat(s2Direct) else 
Source.single[Integer](1).concatLazy(s2Direct)
+      if (!eager) concatDirect.traversalBuilder.pendingBuilder.toString should 
include("JavaStreamConcat")
+
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+    }
+
+    "close the underlying java-stream after java-stream concat completes" in {
+      // Resource hygiene parity with InflightJavaStreamSource: 
BaseStream.close()
+      // must run on stage exhaustion (mirrors postStop in JavaStreamConcat).
+      val closed = new AtomicBoolean(false)
+      val s1 = Source.single(1)
+      val s2 = Source.fromJavaStream { () =>
+        Collections.singleton(2: Integer).stream().onClose(new Runnable {
+          override def run(): Unit = closed.set(true)
+        })
+      }
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("JavaStreamConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+      awaitAssert {
+        closed.get() shouldBe true
+      }
+    }
+
+    "close the underlying java-stream when downstream cancels mid java-stream 
concat" in {
+      val closed = new AtomicBoolean(false)
+      val s1 = Source.single(1)
+      val s2 = Source.fromJavaStream { () =>
+        java.util.stream.IntStream.rangeClosed(2, 
1000).boxed().asInstanceOf[java.util.stream.Stream[Integer]]
+          .onClose(new Runnable {
+            override def run(): Unit = closed.set(true)
+          })
+      }
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("JavaStreamConcat")
+      // take(2) only consumes the upstream element + first element of the 
java-stream, then cancels.
+      concat.take(2).runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+      // postStop runs asynchronously after take(2)'s cancel propagates 
upstream.
+      awaitAssert {
+        closed.get() shouldBe true
+      }
+    }
+
+    "close the underlying java-stream when iterator.next() throws in 
java-stream concat" in {
+      // Resource hygiene parity with InflightJavaStreamSource: even when user 
code
+      // in iterator.next() throws, postStop must close the underlying 
BaseStream.
+      val closed = new AtomicBoolean(false)
+      val ex = new RuntimeException("iterator-boom") with NoStackTrace
+      val s1 = Source.single(1)
+      val s2 = Source.fromJavaStream { () =>
+        // Build a stream whose iterator.next() throws on the first call.
+        java.util.stream.Stream.of[Integer](2: Integer).map[Integer](new 
java.util.function.Function[Integer, Integer] {
+          override def apply(t: Integer): Integer = throw ex
+        }).onClose(new Runnable {
+          override def run(): Unit = closed.set(true)
+        })
+      }
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("JavaStreamConcat")
+      concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+      awaitAssert {
+        closed.get() shouldBe true
+      }
+    }
+
+    "optimize repeat concat" in {
+      val s1 = Source(1 to 3)
+      val s2 = Source.repeat(0)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("RepeatConcat(0)")
+      concat.take(6).runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 0, 
0, 0))
+    }
+
+    "optimize failed concat" in {
+      val ex = new RuntimeException("boom") with NoStackTrace
+      val s1 = Source.single(1)
+      val s2: Source[Int, NotUsed] = Source.failed(ex)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FailedConcat")
+      concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+    }
+
+    "optimize completed-future concat" in {
+      // `Source.future(Future.successful(x))` is itself optimized to a 
`SingleSource`,
+      // so the dispatch lands on `SingleConcat` rather than `FutureConcat`.
+      val s1 = Source.single(1)
+      val s2 = Source.future(Future.successful(2))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("SingleConcat(2)")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+    }
+
+    "optimize pending-future concat" in {
+      val promise = Promise[Int]()
+      val s1 = Source.single(1)
+      val s2 = Source.future(promise.future)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FutureConcat")
+      val resultF = concat.runWith(Sink.seq)
+      promise.success(2)
+      resultF.futureValue should ===(Seq(1, 2))
+    }
+
+    "treat pending-future resolved with null as completion in concat" in {
+      // Lockstep with `Source.future(Future.successful(null))` (rerouted to 
`Source.empty`)
+      // and `InflightSources.hasFutureElement`: a pending future that later 
resolves
+      // with `null` must be treated as completion, never emitted.
+      val promise = Promise[String]()
+      val s1 = Source.single("a")
+      val s2 = Source.future(promise.future)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FutureConcat")
+      val resultF = concat.runWith(Sink.seq)
+      promise.success(null)
+      resultF.futureValue should ===(Seq("a"))
+    }
+
+    "fail the stream when pending-future resolves with failure in concat" in {
+      val ex = new RuntimeException("pending-future-boom") with NoStackTrace
+      val promise = Promise[Int]()
+      val s1 = Source.single(1)
+      val s2 = Source.future(promise.future)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FutureConcat")
+      val resultF = concat.runWith(Sink.seq)
+      promise.failure(ex)
+      resultF.failed.futureValue should ===(ex)
+    }
+
+    "optimize failed-future concat" in {
+      // `Source.future(Future.failed(ex))` is itself optimized to a 
`FailedSource`,
+      // so the dispatch lands on `FailedConcat` rather than `FutureConcat`.
+      val ex = new RuntimeException("future-boom") with NoStackTrace
+      val s1 = Source.single(1)
+      val s2 = Source.future(Future.failed[Int](ex))
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FailedConcat")
+      concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+    }
+
+    "fail eagerly when concat with a failed source even if upstream never 
finishes" in {
+      // Mirrors `FailedSource.preStart` semantics: the failure must surface 
at materialization
+      // time, otherwise `Source.never.concat(Source.failed(ex))` would hang.
+      val ex = new RuntimeException("failed-source-eager") with NoStackTrace
+      val concat = if (eager) Source.never[Int].concat(Source.failed(ex))
+      else Source.never[Int].concatLazy(Source.failed(ex))
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FailedConcat")
+      concat.runWith(Sink.head).failed.futureValue should ===(ex)
+    }
+
+    "fail eagerly when concat with a pending future that fails before upstream 
finishes" in {
+      // Mirrors `FutureSource.preStart` semantics: a pending-future failure 
must propagate
+      // even if upstream is still active. Without eager failure, a 
never-completing upstream
+      // combined with a failing future would hang.
+      val ex = new RuntimeException("pending-future-eager") with NoStackTrace
+      val promise = Promise[Int]()
+      val concat = if (eager) 
Source.never[Int].concat(Source.future(promise.future))
+      else Source.never[Int].concatLazy(Source.future(promise.future))
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("FutureConcat")
+      val resultF = concat.runWith(Sink.head)
+      promise.failure(ex)
+      resultF.failed.futureValue should ===(ex)
+    }
+
+    "honor supervision Resume decider when the inlined iterator throws" in {
+      // Mirrors `IteratorSource.handleIteratorFailure`: a Resume decider must 
skip the
+      // throwing element and continue, instead of failing the stage. Drives 
the IterableConcat
+      // fast path for 
`Source.fromIterator(...).withAttributes(supervisionStrategy(...))`.
+      val ex = new RuntimeException("iter-resume-3") with NoStackTrace
+      val s2 = Source
+        .fromIterator(() => Iterator(1, 2, 3, 4, 5).map(v => if (v == 3) throw 
ex else v))
+        .withAttributes(supervisionStrategy(resumingDecider))
+      val concat = if (eager) Source.single(0).concat(s2) else 
Source.single(0).concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(0, 1, 2, 4, 5))
+    }
+
+    "honor supervision Restart decider by recreating the iterator after throw" 
in {
+      // Mirrors `IteratorSource.handleIteratorFailure` with Restart: on 
iterator failure,
+      // the iterator factory is invoked again and emission continues with the 
fresh iterator.
+      val ex = new RuntimeException("iter-restart") with NoStackTrace
+      val attempt = new AtomicInteger(0)
+      val factory: () => Iterator[Int] = () => {
+        val n = attempt.incrementAndGet()
+        if (n == 1) Iterator(1, 2, 3).map(v => if (v == 2) throw ex else v)
+        else Iterator(4, 5)
+      }
+      val s2 = 
Source.fromIterator(factory).withAttributes(supervisionStrategy(restartingDecider))
+      val concat = if (eager) Source.single(0).concat(s2) else 
Source.single(0).concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).futureValue should ===(Seq(0, 1, 4, 5))
+    }
+
+    "fail when the inlined iterator throws and the default (Stop) decider 
applies" in {
+      // Default decider is `stoppingDecider`; without an explicit 
`withAttributes`, an
+      // iterator throw must fail the concat stream — same as `IterableSource` 
default behaviour.
+      val ex = new RuntimeException("iter-stop") with NoStackTrace
+      val s2 = Source.fromIterator(() => Iterator(1, 2, 3).map(v => if (v == 
2) throw ex else v))
+      val concat = if (eager) Source.single(0).concat(s2) else 
Source.single(0).concatLazy(s2)
+
+      if (!eager) concat.traversalBuilder.pendingBuilder.toString should 
include("IterableConcat")
+      concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+    }
+
+    "gate the value-presented fast path on detached=false (concatLazy only)" 
in {
+      // The fast path is intentionally bypassed for `concat` (detached = 
true) so that the
+      // `Concat(_, detachedInputs = true)` + `Detacher` semantics — eager 
pull at
+      // materialization, one-element prefetch buffer, deadlock-breaking for 
cyclic graphs —
+      // remain in force. `concatLazy` (detached = false) does take the 
inlined fast path.
+      val s1 = Source.single(1)
+      val s2 = Source(List(2, 3))
+      val eagerStr = s1.concat(s2).traversalBuilder.pendingBuilder.toString
+      val lazyStr = s1.concatLazy(s2).traversalBuilder.pendingBuilder.toString
+      (eagerStr should not).include("IterableConcat")
+      lazyStr should include("IterableConcat")
+    }
+
+    "preserve `concat` (detached=true) eager-pull side-effect timing for 
IteratorSource" in {
+      // `Detacher.preStart` does `tryPull(in)` which causes 
IteratorSource.onPull to invoke
+      // the user-supplied factory at materialization time, even if the LHS 
upstream never
+      // finishes. The fast path inlines IteratorSource into IterableConcat 
which delays the
+      // factory call until upstream finishes; gating on detached=false keeps 
detached
+      // semantics intact for `concat`.
+      val factoryRan = Promise[Unit]()
+      val factory: () => Iterator[Int] = () => {
+        factoryRan.trySuccess(())
+        Iterator.empty[Int]
+      }
+      val s1 = Source.never[Int]
+      val s2 = Source.fromIterator(factory)
+      val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+      val (killSwitch, _) =
+        
concat.viaMat(KillSwitches.single)(Keep.right).toMat(Sink.ignore)(Keep.both).run()
+      try {
+        if (eager) {
+          // Factory must run at materialization (Detacher's eager pull).
+          factoryRan.future.futureValue
+        } else {
+          // Factory must NOT run while LHS (Source.never) hasn't completed.
+          try {
+            Await.ready(factoryRan.future, 200.millis)
+            fail("Factory unexpectedly ran with concatLazy")
+          } catch {
+            case _: java.util.concurrent.TimeoutException => () // expected
+          }
+        }
+      } finally {
+        killSwitch.shutdown()
+      }
+    }
   }
 }
 
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
new file mode 100644
index 0000000000..00efb5f40d
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.failed` to a stream is common enough that it 
warrants this
+ * optimization which avoids the actual fan-out for such cases. Mirrors 
`FailedSource`
+ * timing semantics: `preStart()` calls `failStage(failure)`, so the failure 
surfaces
+ * at materialization time even if upstream is still active (e.g. 
`Source.never`).
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class FailedConcat[E](failure: Throwable) extends 
GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("FailedConcat.in")
+  val out = Outlet[E]("FailedConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      // DO NOT CHANGE
+      // WHY: matches FailedSource.preStart which fails at materialization 
time. The
+      // existing concatGraph path materializes FailedSource as a substream 
whose
+      // preStart fails immediately and Concat's onUpstreamFailure propagates 
it
+      // eagerly. Keeping the same eager-failure timing avoids hangs for 
inputs that
+      // never complete (e.g. Source.never.concat(Source.failed(ex))).
+      override def preStart(): Unit = failStage(failure)
+
+      // Handlers are required by GraphStageLogic; preStart fails the stage 
before
+      // any of these can fire.
+      override def onPush(): Unit = ()
+      override def onPull(): Unit = ()
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = s"FailedConcat($failure)"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
new file mode 100644
index 0000000000..509fb1cf21
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.future` to a stream is common enough that it 
warrants this
+ * optimization which avoids the actual fan-out for such cases. Mirrors 
`FutureSource`
+ * timing semantics: the future callback is registered in `preStart()` (so a 
Failure
+ * surfaces eagerly even while upstream is still active), while a successful 
value is
+ * buffered and emitted only after upstream finishes — preserving concat 
ordering.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class FutureConcat[E](future: Future[E]) extends 
GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("FutureConcat.in")
+  val out = Outlet[E]("FutureConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      // null sentinel = not yet resolved (cheaper than Option allocation per 
stage)
+      private var futureResult: Try[E] = null
+      private var upstreamFinished: Boolean = false
+
+      // DO NOT CHANGE
+      // WHY: matches FutureSource.preStart timing. A pending future failure 
must
+      // surface at materialization time, not after upstream completes — 
otherwise
+      // `Source.never.concat(Source.future(failingFuture))` would hang. 
Successful
+      // values are buffered until upstream finishes to preserve concat 
ordering.
+      override def preStart(): Unit =
+        future.value match {
+          case Some(Failure(ex))   => failStage(ex)
+          case completed @ Some(_) => futureResult = completed.get
+          case None                =>
+            val cb = getAsyncCallback[Try[E]] {
+              case Failure(ex) => failStage(ex)
+              case other       =>
+                futureResult = other
+                if (upstreamFinished) emitOrComplete(other)
+            }.invoke _
+            future.onComplete(cb)(ExecutionContext.parasitic)
+        }
+
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit = {
+        upstreamFinished = true
+        if (futureResult ne null) emitOrComplete(futureResult)
+        else
+          // Avoid pulling the now-closed `in` while the future is pending; the
+          // async callback above will emit/fail when it fires.
+          setHandler(out, new OutHandler { override def onPull(): Unit = () })
+      }
+
+      private def emitOrComplete(result: Try[E]): Unit = result match {
+        // DO NOT CHANGE
+        // WHY: Source.future(Future.successful(null)) is rerouted upstream to 
Source.empty,
+        // and InflightSources.hasFutureElement treats Success(null) as 
no-element. This
+        // branch keeps the optimized concat fast-path consistent with both — 
emitting
+        // null here would violate Reactive Streams' no-null rule and diverge 
from the
+        // materialized FutureSource path. Update in lock-step if 
Source.future ever
+        // changes its null treatment.
+        case Success(null) => completeStage()
+        case Success(v)    => emit(out, v, () => completeStage())
+        case Failure(ex)   => failStage(ex)
+      }
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = "FutureConcat"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
new file mode 100644
index 0000000000..ddaeddc933
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, Supervision }
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating an iterable / iterator / range source to a stream is common 
enough
+ * that it warrants this optimization which avoids the actual fan-out for 
value-presented
+ * sources. Java-stream-backed sources are handled by [[JavaStreamConcat]] 
because they
+ * also need deterministic `BaseStream.close()` on stage termination.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class IterableConcat[E](createIterator: () => 
Iterator[E]) extends GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("IterableConcat.in")
+  val out = Outlet[E]("IterableConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      // DO NOT CHANGE
+      // WHY: matches `IterableSource` / `IteratorSource` supervision 
semantics so
+      // `Source.iterable / 
fromIterator(...).withAttributes(supervisionStrategy(...))`
+      // behaves identically whether dispatched through this fast path or 
through the
+      // substream-materializing concatGraph path. Without this, a Resume / 
Restart
+      // decider attached to the inlined source would silently degrade to Stop.
+      private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private var currentIterator: Iterator[E] = _
+      private var iterating: Boolean = false
+
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit =
+        if (!iterating) pull(in)
+        else tryPushNextOrComplete()
+
+      override def onUpstreamFinish(): Unit = {
+        iterating = true
+        if (isAvailable(out)) tryPushNextOrComplete()
+      }
+
+      private def tryPushNextOrComplete(): Unit =
+        try {
+          if (currentIterator eq null) currentIterator = createIterator()
+          if (currentIterator.hasNext) {
+            if (isAvailable(out)) {
+              push(out, currentIterator.next())
+              if (!currentIterator.hasNext) completeStage()
+            }
+          } else {
+            completeStage()
+          }
+        } catch {
+          case NonFatal(ex) =>
+            if (currentIterator eq null) handleIteratorCreationFailure(ex)
+            else handleIteratorFailure(ex)
+        }
+
+      private def handleIteratorCreationFailure(ex: Throwable): Unit = 
decider(ex) match {
+        case Supervision.Stop    => failStage(ex)
+        case Supervision.Resume  => completeStage()
+        case Supervision.Restart =>
+          try {
+            currentIterator = createIterator()
+            tryPushNextOrComplete()
+          } catch {
+            case NonFatal(restartEx) => failStage(restartEx)
+          }
+      }
+
+      private def handleIteratorFailure(ex: Throwable): Unit = decider(ex) 
match {
+        case Supervision.Stop    => failStage(ex)
+        case Supervision.Resume  => tryPushNextOrComplete()
+        case Supervision.Restart =>
+          currentIterator = createIterator()
+          tryPushNextOrComplete()
+      }
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = "IterableConcat"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala
new file mode 100644
index 0000000000..7b399b43a5
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.fromJavaStream` to a stream is common enough that 
it warrants
+ * this optimization which avoids the actual fan-out for such cases. Mirrors 
the
+ * resource-management contract of [[JavaStreamSource]]: the underlying 
`BaseStream` is
+ * opened in `preStart` (so any side effects / exceptions from `open()` happen 
at
+ * materialization time, matching the existing concatGraph path) and is 
guaranteed to
+ * be closed via `postStop` on exhaustion, on iterator failure, on downstream 
cancel,
+ * and on stage failure.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class JavaStreamConcat[E](open: () => 
java.util.stream.BaseStream[E, _])
+    extends GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("JavaStreamConcat.in")
+  val out = Outlet[E]("JavaStreamConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      private var stream: java.util.stream.BaseStream[E, _] = _
+      private var iterator: Iterator[E] = _
+
+      // DO NOT CHANGE
+      // WHY: matches JavaStreamSource.preStart which opens the stream at 
materialization
+      // time. Side effects of `open()` (e.g. file/network resource 
acquisition) and any
+      // exceptions it throws happen at the same point as the existing 
concatGraph path.
+      // Deferring open() to onUpstreamFinish would observably delay these 
effects.
+      override def preStart(): Unit = {
+        stream = open()
+        iterator = stream.iterator().asScala
+      }
+
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit =
+        emitMultiple(out, iterator, () => completeStage())
+
+      override def postStop(): Unit =
+        if (stream ne null) {
+          try stream.close()
+          catch { case NonFatal(_) => () }
+        }
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = "JavaStreamConcat"
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
new file mode 100644
index 0000000000..2ea912e0a1
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * Concatenating a `Source.repeat` to a stream is common enough that it 
warrants this
+ * optimization which avoids the actual fan-out for such cases. After upstream
+ * finishes, the stage indefinitely emits the cached element on every pull 
until
+ * downstream cancels.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class RepeatConcat[E](elem: E) extends 
GraphStage[FlowShape[E, E]] {
+
+  val in = Inlet[E]("RepeatConcat.in")
+  val out = Outlet[E]("RepeatConcat.out")
+
+  override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      override def onPush(): Unit = push(out, grab(in))
+
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit = {
+        setHandler(out,
+          new OutHandler {
+            override def onPull(): Unit = push(out, elem)
+          })
+        if (isAvailable(out)) push(out, elem)
+      }
+
+      setHandlers(in, out, this)
+    }
+
+  override def toString: String = s"RepeatConcat($elem)"
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 6e498d1ca5..c670c8c76a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -32,8 +32,15 @@ import pekko.event.LoggingAdapter
 import pekko.event.MarkerLoggingAdapter
 import pekko.stream._
 import pekko.stream.Attributes.SourceLocation
+import pekko.stream.impl.FailedConcat
+import pekko.stream.impl.FailedSource
+import pekko.stream.impl.FutureConcat
+import pekko.stream.impl.IterableConcat
+import pekko.stream.impl.JavaStreamConcat
+import pekko.stream.impl.JavaStreamSource
 import pekko.stream.impl.LinearTraversalBuilder
 import pekko.stream.impl.ProcessorModule
+import pekko.stream.impl.RepeatConcat
 import pekko.stream.impl.SetupFlowStage
 import pekko.stream.impl.SingleConcat
 import pekko.stream.impl.Stages.DefaultAttributes
@@ -44,6 +51,7 @@ import pekko.stream.impl.TraversalBuilder
 import pekko.stream.impl.fusing
 import pekko.stream.impl.fusing._
 import pekko.stream.impl.fusing.FlattenMerge
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
 import pekko.stream.stage._
 import pekko.util.ConstantFun
 import pekko.util.OptionVal
@@ -3808,11 +3816,47 @@ trait FlowOps[+Out, +Mat] {
     that match {
       case source if TraversalBuilder.isEmptySource(source) => 
this.asInstanceOf[Repr[U]]
       case other                                            =>
-        TraversalBuilder.getSingleSource(other) match {
-          case OptionVal.Some(singleSource) =>
-            via(new SingleConcat(singleSource.elem.asInstanceOf[U]))
-          case _ => via(concatGraph(other, detached))
-        }
+        // DO NOT CHANGE
+        // WHY: only `concatLazy` (detached = false) takes the inlined fast 
path.
+        // `concat` (detached = true) relies on `Concat(_, detachedInputs = 
true)` which
+        // wraps each input port with `Detacher`. Detacher.preStart does an 
eager
+        // `tryPull(in)`, which (a) pulls the LHS upstream at materialization, 
(b) pulls
+        // the RHS source eagerly so its first element is produced and 
buffered before
+        // the LHS finishes, and (c) provides a one-element buffer that 
decouples
+        // upstream/downstream demand (also breaks deadlocks in cyclic graphs).
+        // Inlining the RHS into a single linear stage drops both eager-pull 
side-effect
+        // timing and the buffer, so for `detached = true` we keep the 
substream path.
+        if (!detached) {
+          TraversalBuilder.getValuePresentedSource(other) match {
+            case OptionVal.Some(graph) =>
+              // Attributes attached to the inlined source via 
`withAttributes` (notably
+              // `SupervisionStrategy`, dispatcher hints, log levels) are 
carried over to
+              // the optimized stage so the value-presented fast path stays 
behaviourally
+              // identical to the substream-materializing concatGraph path.
+              val sourceAttrs = other.traversalBuilder.attributes
+              graph match {
+                case single: SingleSource[U] @unchecked =>
+                  via(new SingleConcat(single.elem).addAttributes(sourceAttrs))
+                case iterable: IterableSource[U] @unchecked =>
+                  via(new IterableConcat[U](() => 
iterable.elements.iterator).addAttributes(sourceAttrs))
+                case iterator: IteratorSource[U] @unchecked =>
+                  via(new 
IterableConcat[U](iterator.createIterator).addAttributes(sourceAttrs))
+                case range: RangeSource[U] @unchecked =>
+                  via(new IterableConcat[U](() => 
range.range.iterator.asInstanceOf[Iterator[U]]).addAttributes(
+                    sourceAttrs))
+                case javaStream: JavaStreamSource[U, _] @unchecked =>
+                  via(new 
JavaStreamConcat[U](javaStream.open).addAttributes(sourceAttrs))
+                case repeat: RepeatSource[U] @unchecked =>
+                  via(new 
RepeatConcat[U](repeat.elem).addAttributes(sourceAttrs))
+                case futureSource: FutureSource[U] @unchecked =>
+                  via(new 
FutureConcat[U](futureSource.future).addAttributes(sourceAttrs))
+                case failed: FailedSource[U] @unchecked =>
+                  via(new 
FailedConcat[U](failed.failure).addAttributes(sourceAttrs))
+                case _ => via(concatGraph(other, detached))
+              }
+            case _ => via(concatGraph(other, detached))
+          }
+        } else via(concatGraph(other, detached))
     }
 
   private def internalConcatAll[U >: Out](those: Array[Graph[SourceShape[U], 
_]], detached: Boolean): Repr[U] =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to