This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 7429728805 optimize: FlattenMerge avoids substream materialization for 
value-presented sources (#2977)
7429728805 is described below

commit 7429728805bc775f2326dde675a1e66318dd6393
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Wed May 20 10:59:23 2026 +0800

    optimize: FlattenMerge avoids substream materialization for value-presented 
sources (#2977)
    
    * optimize: FlattenMerge avoids substream materialization for 
value-presented sources
    
    Motivation:
    FlattenMerge previously only fast-pathed `SingleSource`. For all other
    inner sources -- `Source.empty`, `Source(List)`, `Source.fromJavaStream`,
    `Source.future(Future.successful(...))`, range/iterator/repeat sources --
    each one paid the cost of materializing a `SubSinkInlet` and
    `subFusingMaterializer.materialize(...)`. FlattenConcat already does this
    optimization via `TraversalBuilder.getValuePresentedSource`; FlattenMerge
    should benefit too, especially because the single-arg `flatMapConcat(f)`
    internally uses `FlattenMerge(1)` and so depends on FlattenMerge for its
    hot path.
    
    Modification:
    - Generalize FlattenMerge to dispatch on `getValuePresentedSource`
      (instead of `getSingleSource`) and consume `SingleSource`,
      `IterableSource`, `IteratorSource`, `RangeSource`, `RepeatSource`,
      `JavaStreamSource`, `FutureSource`, `FailedSource`, and empty sources
      in-place without materialization, mirroring FlattenConcat.
    - Add an `InflightSource[T]` family inside the new `FlattenMerge`
      companion to occupy a breadth slot for multi-element value-presented
      sources. Track them via a new `pendingInflightSources` counter so
      `activeSources` correctly bounds the breadth budget.
    - Preserve merge semantics: when an inflight source still has more
      elements after a push, re-enqueue it so other concurrent sources keep
      interleaving (instead of draining one source first, which is the
      concat behaviour).
    - Fold completed `Future`s and `FailedSource` directly: success pushes
      or queues a single element, failure calls `failStage`.
    - Pending `Future`s register a callback via `getAsyncCallback` and
      occupy a breadth slot until completion.
    - Empty inner sources are discarded in place (no slot consumed).
    
    Result:
    - `flatMapMerge(breadth, ...)` and the default `flatMapConcat(...)`
      (which routes through `FlattenMerge(1)`) skip substream materialization
      for value-presented inner sources, reducing per-source GC and stage
      overhead.
    - All existing FlattenMerge / flatMapConcat tests pass; new tests cover
      empty / single / iterable / range / java stream / completed and
      delayed future / failed inner sources across breadth = 1..128.
    - Internal API only (`@InternalApi private[pekko]`); MiMa is clean.
    
    References:
    - The optimization mirrors FlattenConcat's value-presented-source
      handling introduced in 1.2.0.
    
    * refactor: share InflightSource family between FlattenConcat and 
FlattenMerge
    
    Motivation:
    After the previous commit, FlattenMerge grew its own copy of the
    `InflightSource[T]` hierarchy (Iterator/Range/Repeat/CompletedFuture/
    PendingFuture) duplicating what FlattenConcat already had in its
    companion object. Two near-identical families across two files is a
    maintenance hazard: any future tweak to the value-presented optimization
    (e.g. adding a new source type, fixing a Java-stream cleanup leak) would
    have to be mirrored, and the families had already drifted in small ways
    (e.g. `tryPull`/`cancel`/`materialize` declared abstract in concat with
    no-op overrides on every subclass; concat used `isClosed = true` for the
    completed-future variant while merge used `!_hasNext`).
    
    Modification:
    - Extract the common `InflightSource[T]` base and the five value-presented
      subclasses (Iterator/Range/Repeat/CompletedFuture/PendingFuture) into
      a new `pekko.stream.impl.fusing.InflightSources` package-private object.
    - Promote `tryPull` / `cancel` / `materialize` from abstract to concrete
      no-op defaults, so the value-presented subclasses no longer carry empty
      overrides. Stages that wrap a real `SubSinkInlet` (only FlattenConcat's
      `attachAndMaterializeSource` does this) override what they need.
    - Align `InflightCompletedFutureSource.isClosed` to FlattenConcat's
      `true` semantics — behaviorally equivalent in both stages, but more
      faithful to the source being a one-shot cached value.
    - Drop the `sealed` modifier on `InflightSource` so FlattenConcat's
      attached-substream anonymous subclass can still extend it from another
      file in the same package.
    - Remove the duplicate definitions from FlattenConcat's companion (now
      unused, drop the empty companion entirely) and from FlattenMerge's
      companion. Both stages import from the shared object instead.
    
    Result:
    - Net -176 lines of duplication; one canonical home for the
      optimization's data types.
    - Future additions (e.g. extending the optimization to other stream-of-
      streams stages such as `MergeMany`-style operators) only need to
      reference `InflightSources`.
    - All FlattenConcat / FlattenMerge / flatMapConcat parallelism tests
      remain green; MiMa is clean (`@InternalApi private[fusing]`).
    
    * fix: address PR #2977 review feedback for value-presented sources
    
    Motivation:
    Copilot review on PR #2977 flagged two issues: (1) Java streams obtained
    via Source.fromJavaStream were converted to Scala iterators and fed into
    InflightIteratorSource, dropping the BaseStream close contract and leaking
    onClose handlers and underlying resources; (2) the test purporting to
    verify "no pre-materialization for value-presented sources" actually
    counted lazySingle materializations and was misleading because 
Source.lazySingle
    is itself non-VP.
    
    Modification:
    - Add InflightJavaStreamSource in InflightSources.scala that wraps the
      BaseStream directly, eagerly closes empty streams, closes on exhaustion,
      and closes on cancel.
    - Wire the new wrapper through FlattenConcat.addJavaStreamSource and
      FlattenMerge.addInflightJavaStreamSource so both stages honor the
      close contract on the value-presented fast path.
    - Cancel queued inflight sources in FlattenMerge.postStop so JavaStream
      resources held in the queue (not yet promoted to active SubSinkInlets)
      are released on stage termination.
    - Replace the misleading test with one that mixes value-presented and
      non-VP inner sources via lazySingle().buffer() and asserts the counter
      equals only the non-VP count, proving the VP fast path skips
      materialization.
    - Add two regression tests for the close contract: exhaustion of
      finite Java streams and downstream cancel against infinite ones.
    
    Result:
    Java streams routed through the VP fast path now close deterministically
    on exhaustion, cancel, and stage termination. The materialization-skip
    property is demonstrated by a meaningful counter test rather than a
    tautology. All 39 FlowFlattenMergeSpec tests pass.
    
    * fix: drop recursive S bound from InflightJavaStreamSource for Scala 3
    
    Motivation:
    PR #2977 CI failed on Scala 3.3.7 with a Type Mismatch in FlattenConcat
    and FlattenMerge: the dispatch pattern matched JavaStreamSource[T, _] and
    forwarded the value to a helper requiring [S <: BaseStream[T, S]]. Scala 2
    implicitly skolemized the existential, but Scala 3 does not, causing the
    stream module to fail to compile on Scala 3.
    
    Modification:
    - Drop the recursive S type parameter from InflightJavaStreamSource;
      internally only iterator() and close() are invoked, both of which work
      on BaseStream[T, _].
    - Drop the matching S type parameter from FlattenConcat.addJavaStreamSource
      and FlattenMerge.addInflightJavaStreamSource, accepting 
JavaStreamSource[T, _]
      directly so the existential never needs to be opened.
    
    Result:
    Scala 3.3.7 cross-compile is clean, Scala 2.13 still compiles, MiMa is
    green, scalafmt/headerCheck pass, and FlowFlattenMergeSpec (39/39) plus
    FlowFlattenConcatSpec / FlowFlatMapConcatSpec all pass on both Scala
    versions.
    
    * fix: treat Success(null) future inner sources as completion in flatten ops
    
    Motivation:
    GraphStages.FutureSource treats `Success(null)` as 
completion-without-element
    (see FutureSource.handle). The optimized value-presented fast paths in
    FlattenConcat and FlattenMerge added in #2977 forwarded `null` downstream,
    violating Reactive Streams' no-null rule and diverging from the materialized
    FutureSource. The InflightJavaStreamSource also failed to close the 
underlying
    BaseStream when user code in `iterator.next()` threw, leaking the resource.
    
    Modification:
    - InflightSources: gate value emission via `hasFutureElement` so both
      completed and pending future wrappers report `hasNext = false` on
      `Success(null)`; close the BaseStream when iterator.next() throws.
    - FlattenConcat / FlattenMerge `addCompletedFutureElem`: special-case
      `Success(null)` as discard, mirroring FutureSource.
    - FlattenConcat `tryPullNextSourceInQueue`: when a head InflightSource
      completes without emitting (Success(null)) and demand is still pending
      for a following SingleSource at the new head, push directly. Without
      this, the stage stalled when `in` was closed.
    - Tests: directional `Success(null)` coverage for both operators.
    
    Result:
    Inflight wrappers stay consistent with FutureSource semantics; no null is
    ever forwarded; resource leak on iterator failure is plugged; FlattenConcat
    no longer stalls when a pending future resolves to `Success(null)` ahead of
    a queued SingleSource.
    
    Tests: stream-tests/testOnly *FlatMap* *Flatten* *PrefixAndTail* 
*Concat*Spec
    References: PR #2977 review comments
    
    * fix: surface settled head source in FlattenConcat tryPullNextSourceInQueue
    
    Motivation:
    When the previous head source completed without emitting (e.g. a pending
    future resolving to Success(null), which the optimized fast path treats as
    completion-without-element to mirror GraphStages.FutureSource), a settled
    InflightSource at the new head — InflightCompletedFutureSource(Failure(_))
    or InflightCompletedFutureSource(Success(_)) — was stranded in the queue:
    tryPullNextSourceInQueue called src.tryPull(), which is a no-op for
    already-settled sources. With `in` already closed, no further onPull would
    fire and the stage stalled.
    
    Modification:
    Drive the new InflightSource head via pushOut when demand is pending, so a
    queued failure surfaces via handleCurrentSourceClosed and a queued
    completed-future value gets emitted. Add directional tests covering both
    paths in FlowFlatMapConcatParallelismSpec.
    
    Result:
    Failures and completed-future values queued behind a Success(null) pending
    future propagate instead of stalling; existing flatten suites stay green.
    
    Tests: stream-tests/testOnly FlowFlatMapConcatParallelismSpec 
FlowFlattenMergeSpec
---
 .../FlowFlatMapConcatParallelismSpec.scala         |  47 ++++++
 .../stream/scaladsl/FlowFlattenMergeSpec.scala     | 175 +++++++++++++++++++-
 .../pekko/stream/impl/fusing/FlattenConcat.scala   | 174 +++++++-------------
 .../pekko/stream/impl/fusing/InflightSources.scala | 181 +++++++++++++++++++++
 .../pekko/stream/impl/fusing/StreamOfStreams.scala | 178 ++++++++++++++++----
 5 files changed, 608 insertions(+), 147 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
index 483838aece..59c61b8ff9 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala
@@ -149,6 +149,53 @@ class FlowFlatMapConcatParallelismSpec extends 
StreamSpec("""
         .futureValue should ===(1 to 4)
     }
 
+    "treat Success(null) future inner sources as completion-without-element" 
in {
+      // Mirrors GraphStages.FutureSource semantics: Success(null) completes 
the inner
+      // source without emitting. The optimized fast path must match this.
+      val toIntegerSeq = 
Flow[Integer].grouped(1000).toMat(Sink.head)(Keep.right)
+      Source(
+        List[Source[Integer, NotUsed]](
+          Source.future(Future.successful[Integer](null)),
+          Source.single[Integer](1),
+          Source.future(after(1.millis)(Future.successful[Integer](null))),
+          Source.lazyFuture(() => Future.successful[Integer](null)),
+          Source.single[Integer](2)))
+        .flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity)
+        .runWith(toIntegerSeq)
+        .futureValue should ===(Seq[Integer](1, 2))
+    }
+
+    "propagate inner-source failure queued behind a Success(null) pending 
future" in {
+      // Regression: tryPullNextSourceInQueue used to call src.tryPull() for
+      // InflightSource heads, which is a no-op for already-settled sources
+      // (e.g. InflightCompletedFutureSource(Failure(_))). Once the pending
+      // Success(null) head was removed, the failure sat in the queue forever
+      // because nothing surfaced it. Drive the head via pushOut instead.
+      val ex = new BoomException
+      val probe = Source(
+        List[Source[Integer, NotUsed]](
+          Source.future(after(1.millis)(Future.successful[Integer](null))),
+          Source.failed[Integer](ex)))
+        .flatMapConcat(parallelism = 4, identity)
+        .runWith(TestSink())
+      probe.request(1)
+      probe.expectError() should ===(ex)
+    }
+
+    "emit completed future queued behind a Success(null) pending future" in {
+      // Same root cause as above: an InflightCompletedFutureSource(Success(_))
+      // queued behind a Success(null) pending future was stranded because
+      // tryPull() is a no-op on already-settled sources.
+      val toIntegerSeq = 
Flow[Integer].grouped(1000).toMat(Sink.head)(Keep.right)
+      Source(
+        List[Source[Integer, NotUsed]](
+          Source.future(after(1.millis)(Future.successful[Integer](null))),
+          Source.future(Future.successful[Integer](42))))
+        .flatMapConcat(parallelism = 4, identity)
+        .runWith(toIntegerSeq)
+        .futureValue should ===(Seq[Integer](42))
+    }
+
     "work with value presented sources when demands slow" in {
       val prob = Source(
         List(Source.empty[Int], Source.single(1), Source(List(2, 3, 4)), 
Source.lazyFuture(() => Future.successful(5))))
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
index 9ae6a231dc..28d44ecaca 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
@@ -13,11 +13,18 @@
 
 package org.apache.pekko.stream.scaladsl
 
+import java.util.Collections
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.annotation.switch
 import scala.concurrent._
 import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
 
 import org.apache.pekko
 import pekko.NotUsed
+import pekko.pattern.FutureTimeoutSupport
 import pekko.stream._
 import pekko.stream.stage.GraphStage
 import pekko.stream.stage.GraphStageLogic
@@ -30,9 +37,11 @@ import pekko.testkit.TestLatch
 
 import org.scalatest.exceptions.TestFailedException
 
-class FlowFlattenMergeSpec extends StreamSpec {
+class FlowFlattenMergeSpec extends StreamSpec with FutureTimeoutSupport {
   import system.dispatcher
 
+  class BoomException extends RuntimeException("BOOM~~") with NoStackTrace
+
   def src10(i: Int) = Source(i until (i + 10))
   def blocked = Source.future(Promise[Int]().future)
 
@@ -280,5 +289,169 @@ class FlowFlattenMergeSpec extends StreamSpec {
       probe.expectComplete()
     }
 
+    val checkBreadths = List(1, 2, 4, 8, 16, 32, 64, 128)
+
+    for (b <- checkBreadths) {
+      s"work with value presented sources with breadth: $b" in {
+        Source(
+          List(
+            Source.empty[Int],
+            Source.single(1),
+            Source.empty[Int],
+            Source(List(2, 3, 4)),
+            Source.future(Future.successful(5)),
+            Source.lazyFuture(() => Future.successful(6)),
+            Source.future(after(1.millis)(Future.successful(7)))))
+          .flatMapMerge(b, identity)
+          .runWith(toSet)
+          .futureValue should ===((1 to 7).toSet)
+      }
+    }
+
+    def generateRandomValuePresentedSources(nums: Int): (Int, List[Source[Int, 
NotUsed]]) = {
+      val seq = List.tabulate(nums) { _ =>
+        val random = ThreadLocalRandom.current().nextInt(1, 10)
+        (random: @switch) match {
+          case 1 => Source.single(1)
+          case 2 => Source(List(1))
+          case 3 => Source.fromJavaStream(() => 
Collections.singleton(1).stream())
+          case 4 => Source.future(Future.successful(1))
+          case 5 => Source.future(after(1.millis)(Future.successful(1)))
+          case _ => Source.empty[Int]
+        }
+      }
+      val sum = seq.filterNot(_.eq(Source.empty[Int])).size
+      (sum, seq)
+    }
+
+    for (b <- checkBreadths) {
+      s"work with generated value presented sources with breadth: $b " in {
+        val (sum, sources @ _) = generateRandomValuePresentedSources(10000)
+        Source(sources)
+          .flatMapMerge(b, identity(_))
+          .runWith(Sink.seq)
+          .map(_.sum)(scala.concurrent.ExecutionContext.parasitic)
+          .futureValue shouldBe sum
+      }
+    }
+
+    "work with value presented failed sources" in {
+      val ex = new BoomException
+      Source(
+        List(
+          Source.empty[Int],
+          Source.single(1),
+          Source.empty[Int],
+          Source(List(2, 3, 4)),
+          Source.future(Future.failed(ex)),
+          Source.lazyFuture(() => Future.successful(5))))
+        .flatMapMerge(ThreadLocalRandom.current().nextInt(1, 129), identity)
+        .onErrorComplete[BoomException]()
+        .runWith(toSet)
+        .futureValue.subsetOf((1 to 5).toSet) should ===(true)
+    }
+
+    "treat Success(null) future inner sources as completion-without-element" 
in {
+      // Mirrors GraphStages.FutureSource semantics: Success(null) completes 
the inner
+      // source without emitting. The optimized fast path must match this.
+      val toIntegerSet = 
Flow[Integer].grouped(1000).toMat(Sink.head)(Keep.right).mapMaterializedValue(_.map(_.toSet))
+      Source(
+        List[Source[Integer, NotUsed]](
+          Source.future(Future.successful[Integer](null)),
+          Source.single[Integer](1),
+          Source.future(after(1.millis)(Future.successful[Integer](null))),
+          Source.lazyFuture(() => Future.successful[Integer](null)),
+          Source.single[Integer](2)))
+        .flatMapMerge(ThreadLocalRandom.current().nextInt(1, 129), identity)
+        .runWith(toIntegerSet)
+        .futureValue should ===(Set[Integer](1, 2))
+    }
+
+    val breadth = ThreadLocalRandom.current().nextInt(4, 65)
+    s"avoid pre-materialization for value-presented sources, breadth = 
$breadth" in {
+      val materializationCounter = new AtomicInteger(0)
+      val n = breadth * 3
+      val probe = Source(1 to n)
+        .flatMapMerge(
+          breadth,
+          value =>
+            Source.lazySingle(() => {
+              materializationCounter.incrementAndGet()
+              value
+            }))
+        .runWith(TestSink())
+
+      probe.request(n.toLong)
+      probe.expectNextN(n.toLong).toSet should ===((1 to n).toSet)
+      probe.expectComplete()
+      // Source.lazySingle is not a value-presented source, so each is 
materialized.
+      materializationCounter.get() shouldBe n
+    }
+
+    s"only materialize non-value-presented inner sources, breadth = $breadth" 
in {
+      val materializationCounter = new AtomicInteger(0)
+      val n = breadth * 3
+      // Mix value-presented (Source.single, fast path) with 
non-value-presented
+      // (lazySingle.buffer, slow path). The counter sits inside the lazySingle
+      // factory and only fires when the inner source is materialized as a 
substream.
+      val probe = Source(1 to (n * 2))
+        .flatMapMerge(
+          breadth,
+          value =>
+            if (value % 2 == 0) Source.single(value)
+            else
+              Source
+                .lazySingle(() => {
+                  materializationCounter.incrementAndGet()
+                  value
+                })
+                .buffer(1, overflowStrategy = OverflowStrategy.backpressure))
+        .runWith(TestSink())
+
+      probe.request(n.toLong * 2)
+      probe.expectNextN(n.toLong * 2).toSet should ===((1 to (n * 2)).toSet)
+      probe.expectComplete()
+      // Only odd values (non-VP) take the substream materialization path.
+      materializationCounter.get() shouldBe n
+    }
+
+    "close JavaStream-backed inner sources on exhaustion" in {
+      val closeCount = new AtomicInteger(0)
+      val streams = (1 to 4).toList
+      Source(streams)
+        .flatMapMerge(
+          4,
+          (n: Int) =>
+            Source.fromJavaStream(() =>
+              java.util.stream.Stream.of((1 to n).map(Integer.valueOf): 
_*).onClose(() =>
+                closeCount.incrementAndGet())))
+        .runWith(Sink.ignore)
+        .futureValue
+      closeCount.get() shouldBe streams.size
+    }
+
+    "close JavaStream-backed inner sources on downstream cancel" in {
+      val closeCount = new AtomicInteger(0)
+      // Endless inner streams; when downstream cancels, the inflight wrappers
+      // queued in FlattenMerge must close their underlying Java streams.
+      val probe = Source
+        .repeat(())
+        .flatMapMerge(
+          4,
+          _ =>
+            Source.fromJavaStream(() =>
+              java.util.stream.Stream
+                .generate[Integer](() => 1)
+                .onClose(() => closeCount.incrementAndGet())))
+        .runWith(TestSink())
+
+      probe.request(8)
+      probe.expectNextN(8)
+      probe.cancel()
+      awaitAssert {
+        closeCount.get() should be >= 1
+      }
+    }
+
   }
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
index 7592733d35..2aefcc9d57 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala
@@ -27,103 +27,11 @@ import pekko.stream.{ Attributes, FlowShape, Graph, Inlet, 
Outlet, SourceShape,
 import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource, 
JavaStreamSource, TraversalBuilder }
 import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
+import pekko.stream.impl.fusing.InflightSources._
 import pekko.stream.scaladsl.Source
 import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
 import pekko.util.OptionVal
 
-/**
- * INTERNAL API
- */
-@InternalApi
-private[pekko] object FlattenConcat {
-  private sealed abstract class InflightSource[T] {
-    def hasNext: Boolean
-    def next(): T
-    def tryPull(): Unit
-    def cancel(cause: Throwable): Unit
-    def isClosed: Boolean
-    def hasFailed: Boolean = failure.isDefined
-    def failure: Option[Throwable] = None
-    def materialize(): Unit = ()
-  }
-
-  private final class InflightIteratorSource[T](iterator: Iterator[T]) extends 
InflightSource[T] {
-    override def hasNext: Boolean = iterator.hasNext
-    override def next(): T = iterator.next()
-    override def tryPull(): Unit = ()
-    override def cancel(cause: Throwable): Unit = ()
-    override def isClosed: Boolean = !hasNext
-  }
-
-  private final class InflightRangeSource[T](range: immutable.Range) extends 
InflightSource[T] {
-    private val isEmptyRange = range.isEmpty
-    private val rangeLast = if (isEmptyRange) 0 else range.last
-    private val rangeStep = range.step
-    private var nextElement = range.start
-    private var closed = isEmptyRange
-
-    override def hasNext: Boolean = !closed
-    override def next(): T =
-      if (closed) throw new NoSuchElementException("next called after 
completion")
-      else {
-        val current = nextElement
-        if (current == rangeLast) closed = true
-        else nextElement = current + rangeStep
-        current.asInstanceOf[T]
-      }
-    override def tryPull(): Unit = ()
-    override def cancel(cause: Throwable): Unit = ()
-    override def isClosed: Boolean = closed
-  }
-
-  private final class InflightRepeatSource[T](elem: T) extends 
InflightSource[T] {
-    override def hasNext: Boolean = true
-    override def next(): T = elem
-    override def tryPull(): Unit = ()
-    override def cancel(cause: Throwable): Unit = ()
-    override def isClosed: Boolean = false
-  }
-
-  private final class InflightCompletedFutureSource[T](result: Try[T]) extends 
InflightSource[T] {
-    private var _hasNext = result.isSuccess
-    override def hasNext: Boolean = _hasNext
-    override def next(): T = {
-      if (_hasNext) {
-        _hasNext = false
-        result.get
-      } else throw new NoSuchElementException("next called after completion")
-    }
-    override def hasFailed: Boolean = result.isFailure
-    override def failure: Option[Throwable] = result.failed.toOption
-    override def tryPull(): Unit = ()
-    override def cancel(cause: Throwable): Unit = ()
-    override def isClosed: Boolean = true
-  }
-
-  private final class InflightPendingFutureSource[T](cb: InflightSource[T] => 
Unit)
-      extends InflightSource[T]
-      with (Try[T] => Unit) {
-    private var result: Try[T] = MapAsync.NotYetThere
-    private var consumed = false
-    override def apply(result: Try[T]): Unit = {
-      this.result = result
-      cb(this)
-    }
-    override def hasNext: Boolean = (result ne MapAsync.NotYetThere) && 
!consumed && result.isSuccess
-    override def next(): T = {
-      if (!consumed) {
-        consumed = true
-        result.get
-      } else throw new NoSuchElementException("next called after completion")
-    }
-    override def hasFailed: Boolean = (result ne MapAsync.NotYetThere) && 
result.isFailure
-    override def failure: Option[Throwable] = if (result eq 
MapAsync.NotYetThere) None else result.failed.toOption
-    override def tryPull(): Unit = ()
-    override def cancel(cause: Throwable): Unit = ()
-    override def isClosed: Boolean = consumed || hasFailed
-  }
-}
-
 /**
  * INTERNAL API
  */
@@ -138,7 +46,6 @@ private[pekko] final class FlattenConcat[T, M](parallelism: 
Int)
   override val shape: FlowShape[Graph[SourceShape[T], M], T] = FlowShape(in, 
out)
   override def createLogic(enclosingAttributes: Attributes) = {
     object FlattenConcatLogic extends GraphStageLogic(shape) with InHandler 
with OutHandler {
-      import FlattenConcat._
       // InflightSource[T] or SingleSource[T]
       // AnyRef here to avoid lift the SingleSource[T] to InflightSource[T]
       private var queue: BufferImpl[AnyRef] = _
@@ -150,7 +57,9 @@ private[pekko] final class FlattenConcat[T, M](parallelism: 
Int)
       private def futureSourceCompleted(futureSource: InflightSource[T]): Unit 
= {
         if (queue.peek() eq futureSource) {
           if (isAvailable(out) && futureSource.hasNext) {
-            push(out, futureSource.next()) // TODO should filter out the 
`null` here?
+            // Success(null) is filtered out via 
InflightPendingFutureSource.hasNext to stay
+            // consistent with GraphStages.FutureSource (which treats 
Success(null) as completion).
+            push(out, futureSource.next())
             if (futureSource.isClosed) {
               handleCurrentSourceClosed(futureSource)
             }
@@ -269,17 +178,43 @@ private[pekko] final class FlattenConcat[T, 
M](parallelism: Int)
         queue.enqueue(inflightSource)
       }
 
-      private def addCompletedFutureElem(elem: Try[T]): Unit = {
+      private def addJavaStreamSource(javaStream: JavaStreamSource[T, _]): 
Unit = {
+        val inflightSource = new InflightJavaStreamSource[T](javaStream.open)
         if (isAvailable(out) && queue.isEmpty) {
-          elem match {
-            case scala.util.Success(value) => push(out, value)
-            case scala.util.Failure(ex)    => onUpstreamFailure(ex)
+          if (inflightSource.hasNext) {
+            push(out, inflightSource.next())
+            if (inflightSource.hasNext) {
+              queue.enqueue(inflightSource)
+            }
           }
-        } else {
-          queue.enqueue(new InflightCompletedFutureSource(elem))
+        } else if (inflightSource.hasNext) {
+          queue.enqueue(inflightSource)
         }
       }
 
+      private def addCompletedFutureElem(elem: Try[T]): Unit = elem match {
+        // DO NOT CHANGE
+        // WHY: GraphStages.FutureSource treats Success(null) as 
completion-without-element
+        // (see FutureSource.handle: `case Success(null) => completeStage()`). 
The fast path
+        // here MUST mirror that — pushing null would violate Reactive 
Streams' no-null rule
+        // and diverge from the materialized FutureSource behaviour.
+        // How to apply: keep this branch in sync with FutureSource semantics; 
if FutureSource
+        // ever changes how it treats Success(null), update here too.
+        case scala.util.Success(null)  => // empty inner source: discard, slot 
is freed when caller dequeues
+        case scala.util.Success(value) =>
+          if (isAvailable(out) && queue.isEmpty) {
+            push(out, value)
+          } else {
+            queue.enqueue(new InflightCompletedFutureSource(elem))
+          }
+        case scala.util.Failure(ex) =>
+          if (isAvailable(out) && queue.isEmpty) {
+            onUpstreamFailure(ex)
+          } else {
+            queue.enqueue(new InflightCompletedFutureSource(elem))
+          }
+      }
+
       private def addPendingFutureElem(future: Future[T]): Unit = {
         val inflightSource = new InflightPendingFutureSource[T](invokeCb)
         
future.onComplete(inflightSource)(scala.concurrent.ExecutionContext.parasitic)
@@ -336,13 +271,11 @@ private[pekko] final class FlattenConcat[T, 
M](parallelism: Int)
                   case Some(elem) => addCompletedFutureElem(elem)
                   case None       => addPendingFutureElem(future)
                 }
-              case iterable: IterableSource[T] @unchecked        => 
addSourceElements(iterable.elements.iterator)
-              case iterator: IteratorSource[T] @unchecked        => 
addSourceElements(iterator.createIterator())
-              case range: RangeSource[T] @unchecked              => 
addRangeSource(range.range)
-              case repeat: RepeatSource[T] @unchecked            => 
addRepeatSource(repeat.elem)
-              case javaStream: JavaStreamSource[T, _] @unchecked =>
-                import scala.jdk.CollectionConverters._
-                addSourceElements(javaStream.open().iterator.asScala)
+              case iterable: IterableSource[T] @unchecked                   => 
addSourceElements(iterable.elements.iterator)
+              case iterator: IteratorSource[T] @unchecked                   => 
addSourceElements(iterator.createIterator())
+              case range: RangeSource[T] @unchecked                         => 
addRangeSource(range.range)
+              case repeat: RepeatSource[T] @unchecked                       => 
addRepeatSource(repeat.elem)
+              case javaStream: JavaStreamSource[T, _] @unchecked            => 
addJavaStreamSource(javaStream)
               case failed: FailedSource[T] @unchecked                       => 
addCompletedFutureElem(Failure(failed.failure))
               case maybeEmpty if TraversalBuilder.isEmptySource(maybeEmpty) => 
// Empty source is discarded
               case _                                                        => 
attachAndMaterializeSource(source)
@@ -382,12 +315,25 @@ private[pekko] final class FlattenConcat[T, 
M](parallelism: Int)
         }
       }
 
-      private def tryPullNextSourceInQueue(): Unit = {
-        // pull the new emitting source
-        val nextSource = queue.peek()
-        if (nextSource.isInstanceOf[InflightSource[T] @unchecked]) {
-          nextSource.asInstanceOf[InflightSource[T]].tryPull()
-        }
+      private def tryPullNextSourceInQueue(): Unit = queue.peek() match {
+        // DO NOT CHANGE
+        // WHY: The previous head source may complete without emitting (e.g. a 
pending
+        // future resolving to Success(null), which we treat as 
completion-without-
+        // element to mirror GraphStages.FutureSource). When that happens with 
demand
+        // still pending and `in` already closed, no further onPull will fire 
— the
+        // stage would stall on whatever sits next in the queue. Drive the new 
head
+        // directly: push SingleSource, or pushOut for InflightSource (which 
also
+        // surfaces a queued InflightCompletedFutureSource failure, otherwise 
the
+        // failure would never propagate).
+        case src: SingleSource[T] @unchecked =>
+          if (isAvailable(out)) {
+            push(out, src.elem)
+            removeSource()
+          }
+        case src: InflightSource[T] @unchecked =>
+          if (isAvailable(out)) pushOut(src)
+          else src.tryPull()
+        case _ => // queue empty or unexpected: nothing to pull
       }
 
       setHandlers(in, out, this)
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/InflightSources.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/InflightSources.scala
new file mode 100644
index 0000000000..3d4a11c039
--- /dev/null
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/InflightSources.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.collection.immutable
+import scala.util.{ Success, Try }
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ *
+ * Lightweight in-memory representations of value-presented `Source`s that can
+ * be consumed without paying for substream materialization. Shared between
+ * [[FlattenConcat]] and [[FlattenMerge]].
+ */
+@InternalApi
+private[fusing] object InflightSources {
+
+  /**
+   * Common base. The optimized value-presented variants below have no
+   * upstream to pull from or cancel, so `tryPull` / `cancel` / `materialize`
+   * default to no-ops. Stages that wrap a real `SubSinkInlet` (for sources
+   * that still require materialization) override these as needed.
+   */
+  private[fusing] abstract class InflightSource[T] {
+    def hasNext: Boolean
+    def next(): T
+    def isClosed: Boolean
+    def tryPull(): Unit = ()
+    def cancel(cause: Throwable): Unit = ()
+    def materialize(): Unit = ()
+    def hasFailed: Boolean = failure.isDefined
+    def failure: Option[Throwable] = None
+  }
+
+  private[fusing] final class InflightIteratorSource[T](iterator: Iterator[T]) 
extends InflightSource[T] {
+    override def hasNext: Boolean = iterator.hasNext
+    override def next(): T = iterator.next()
+    override def isClosed: Boolean = !hasNext
+  }
+
+  /**
+   * Wraps a [[java.util.stream.BaseStream]] so it can be consumed without
+   * substream materialization while still honoring the close contract: the
+   * underlying stream is closed on exhaustion, on `cancel`, and eagerly when
+   * the spliterator advertises it as empty.
+   *
+   * The recursive `S <: BaseStream[T, S]` bound that [[JavaStreamSource]]
+   * carries is intentionally dropped here: only `iterator()` and `close()`
+   * are invoked internally, and both are available on `BaseStream[T, _]`.
+   * Keeping the bound would force callers to skolemize the existential
+   * captured by pattern matching `JavaStreamSource[T, _]`, which Scala 3
+   * does not do implicitly.
+   */
+  private[fusing] final class InflightJavaStreamSource[T](
+      open: () => java.util.stream.BaseStream[T, _]) extends InflightSource[T] 
{
+    private val stream: java.util.stream.BaseStream[T, _] = open()
+    private val iterator: java.util.Iterator[T] = stream.iterator()
+    private var closed: Boolean = false
+    // Eagerly close empty streams so we don't leak the resource for empty 
inner sources.
+    if (!iterator.hasNext) closeStream()
+
+    private def closeStream(): Unit =
+      if (!closed) {
+        closed = true
+        try stream.close()
+        catch { case NonFatal(_) => () }
+      }
+
+    override def hasNext: Boolean = !closed
+    override def next(): T =
+      if (closed) throw new NoSuchElementException("next called after 
completion")
+      else {
+        try {
+          val elem = iterator.next()
+          if (!iterator.hasNext) closeStream()
+          elem
+        } catch {
+          case NonFatal(ex) =>
+            // If user code in iterator.next() throws, ensure the BaseStream is
+            // closed before propagating the failure: postStop on the enclosing
+            // FlattenMerge/FlattenConcat may not have a chance to cancel us.
+            closeStream()
+            throw ex
+        }
+      }
+    override def isClosed: Boolean = closed
+    override def cancel(cause: Throwable): Unit = closeStream()
+  }
+
+  private[fusing] final class InflightRangeSource[T](range: immutable.Range) 
extends InflightSource[T] {
+    private val isEmptyRange = range.isEmpty
+    private val rangeLast = if (isEmptyRange) 0 else range.last
+    private val rangeStep = range.step
+    private var nextElement = range.start
+    private var closed = isEmptyRange
+
+    override def hasNext: Boolean = !closed
+    override def next(): T =
+      if (closed) throw new NoSuchElementException("next called after 
completion")
+      else {
+        val current = nextElement
+        if (current == rangeLast) closed = true
+        else nextElement = current + rangeStep
+        current.asInstanceOf[T]
+      }
+    override def isClosed: Boolean = closed
+  }
+
+  private[fusing] final class InflightRepeatSource[T](elem: T) extends 
InflightSource[T] {
+    override def hasNext: Boolean = true
+    override def next(): T = elem
+    override def isClosed: Boolean = false
+  }
+
+  // DO NOT CHANGE
+  // WHY: GraphStages.FutureSource treats Success(null) as 
completion-without-element
+  // (see FutureSource.handle: `case Success(null) => completeStage()`). The 
inflight
+  // wrappers below MUST stay consistent with that behaviour, otherwise the 
optimized
+  // value-presented fast path would emit null — violating Reactive Streams' 
no-null
+  // rule and diverging from the materialized FutureSource. If FutureSource 
semantics
+  // are ever changed, these wrappers must be updated in lock-step.
+  private def hasFutureElement[T](result: Try[T]): Boolean = result match {
+    case Success(v) => v != null
+    case _          => false
+  }
+
+  private[fusing] final class InflightCompletedFutureSource[T](result: Try[T]) 
extends InflightSource[T] {
+    private var _hasNext = hasFutureElement(result)
+    override def hasNext: Boolean = _hasNext
+    override def next(): T =
+      if (_hasNext) {
+        _hasNext = false
+        result.get
+      } else throw new NoSuchElementException("next called after completion")
+    override def hasFailed: Boolean = result.isFailure
+    override def failure: Option[Throwable] = result.failed.toOption
+    // The future has already produced its value (or failure); the source is
+    // fundamentally one-shot and reports as closed even before consumption.
+    override def isClosed: Boolean = true
+  }
+
+  private[fusing] final class InflightPendingFutureSource[T](cb: 
InflightSource[T] => Unit)
+      extends InflightSource[T]
+      with (Try[T] => Unit) {
+    private var result: Try[T] = MapAsync.NotYetThere
+    private var consumed = false
+    override def apply(result: Try[T]): Unit = {
+      this.result = result
+      cb(this)
+    }
+    override def hasNext: Boolean = (result ne MapAsync.NotYetThere) && 
!consumed && hasFutureElement(result)
+    override def next(): T =
+      if (!consumed) {
+        consumed = true
+        result.get
+      } else throw new NoSuchElementException("next called after completion")
+    override def hasFailed: Boolean = (result ne MapAsync.NotYetThere) && 
result.isFailure
+    override def failure: Option[Throwable] = if (result eq 
MapAsync.NotYetThere) None else result.failed.toOption
+    override def isClosed: Boolean = consumed || hasFailed ||
+      ((result ne MapAsync.NotYetThere) && !hasFutureElement(result))
+  }
+}
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
index 5b6c82745f..bf9dd86433 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala
@@ -18,8 +18,10 @@ import java.util.concurrent.atomic.AtomicReference
 
 import scala.annotation.{ nowarn, tailrec }
 import scala.collection.immutable
+import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 import scala.jdk.CollectionConverters._
+import scala.util.{ Failure, Success, Try }
 import scala.util.control.NonFatal
 
 import org.apache.pekko
@@ -30,13 +32,14 @@ import 
pekko.stream.ActorAttributes.StreamSubscriptionTimeout
 import pekko.stream.ActorAttributes.SupervisionStrategy
 import pekko.stream.Attributes.SourceLocation
 import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{ Buffer => BufferImpl }
+import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource, 
JavaStreamSource }
 import pekko.stream.impl.ActorSubscriberMessage
 import pekko.stream.impl.ActorSubscriberMessage.OnError
 import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.SubscriptionTimeoutException
 import pekko.stream.impl.TraversalBuilder
-import pekko.stream.impl.fusing.GraphStages.SingleSource
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource, 
SingleSource }
+import pekko.stream.impl.fusing.InflightSources._
 import pekko.stream.scaladsl._
 import pekko.stream.stage._
 import pekko.util.OptionVal
@@ -57,14 +60,33 @@ import pekko.util.OptionVal
     new GraphStageLogic(shape) with OutHandler with InHandler {
       var sources = Set.empty[SubSinkInlet[T]]
       var pendingSingleSources = 0
-      def activeSources = sources.size + pendingSingleSources
+      var pendingInflightSources = 0
+      def activeSources = sources.size + pendingSingleSources + 
pendingInflightSources
 
-      // To be able to optimize for SingleSource without materializing them 
the queue may hold either
-      // SubSinkInlet[T] or SingleSource
+      // To be able to optimize for value-presented sources without 
materializing them, the queue may hold
+      // SubSinkInlet[T], SingleSource, or InflightSource[T]
       var queue: BufferImpl[AnyRef] = _
 
       override def preStart(): Unit = queue = BufferImpl(breadth, 
enclosingAttributes)
 
+      private val invokeCb: InflightSource[T] => Unit =
+        getAsyncCallback[InflightSource[T]](inflightFutureCompleted).invoke
+
+      private def inflightFutureCompleted(source: InflightSource[T]): Unit = {
+        if (source.hasFailed) {
+          failStage(source.failure.get)
+        } else if (source.hasNext) {
+          if (isAvailable(out) && queue.isEmpty) {
+            push(out, source.next())
+            removeSource(source)
+          } else {
+            queue.enqueue(source)
+          }
+        } else {
+          removeSource(source)
+        }
+      }
+
       def pushOut(): Unit = {
         queue.dequeue() match {
           case src: SubSinkInlet[T] @unchecked =>
@@ -74,6 +96,10 @@ import pekko.util.OptionVal
           case single: SingleSource[T] @unchecked =>
             push(out, single.elem)
             removeSource(single)
+          case inflight: InflightSource[T] @unchecked =>
+            push(out, inflight.next())
+            if (inflight.isClosed) removeSource(inflight)
+            else queue.enqueue(inflight)
           case other =>
             throw new IllegalStateException(s"Unexpected source type in queue: 
'${other.getClass}'")
         }
@@ -100,36 +126,111 @@ import pekko.util.OptionVal
       setHandlers(in, out, this)
 
       def addSource(source: Graph[SourceShape[T], M]): Unit = {
-        // If it's a SingleSource or wrapped such we can push the element 
directly instead of materializing it.
-        // Have to use AnyRef because of OptionVal null value.
-        
TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], 
M]]) match {
-          case OptionVal.Some(single) =>
-            if (isAvailable(out) && queue.isEmpty) {
-              push(out, single.elem.asInstanceOf[T])
-            } else {
-              queue.enqueue(single)
-              pendingSingleSources += 1
-            }
-          case _ =>
-            val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
-            sinkIn.setHandler(new InHandler {
-              override def onPush(): Unit = {
-                if (isAvailable(out)) {
-                  push(out, sinkIn.grab())
-                  sinkIn.pull()
-                } else {
-                  queue.enqueue(sinkIn)
+        // If it's a value-presented source (or wrapped such) we can avoid 
substream materialization.
+        TraversalBuilder.getValuePresentedSource(source) match {
+          case OptionVal.Some(graph) =>
+            graph match {
+              case single: SingleSource[T] @unchecked       => 
addSingleSource(single)
+              case futureSource: FutureSource[T] @unchecked =>
+                val future = futureSource.future
+                future.value match {
+                  case Some(elem) => addCompletedFutureElem(elem)
+                  case None       => addPendingFutureElem(future)
                 }
-              }
-              override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) 
removeSource(sinkIn)
-            })
-            sinkIn.pull()
-            sources += sinkIn
-            val graph = Source.fromGraph(source).to(sinkIn.sink)
-            interpreter.subFusingMaterializer.materialize(graph, 
defaultAttributes = enclosingAttributes)
+              case iterable: IterableSource[T] @unchecked                   => 
addInflightIteratorSource(iterable.elements.iterator)
+              case iterator: IteratorSource[T] @unchecked                   => 
addInflightIteratorSource(iterator.createIterator())
+              case range: RangeSource[T] @unchecked                         => 
addInflightRangeSource(range.range)
+              case repeat: RepeatSource[T] @unchecked                       => 
addInflightRepeatSource(repeat.elem)
+              case javaStream: JavaStreamSource[T, _] @unchecked            => 
addInflightJavaStreamSource(javaStream)
+              case failed: FailedSource[T] @unchecked                       => 
addCompletedFutureElem(Failure(failed.failure))
+              case maybeEmpty if TraversalBuilder.isEmptySource(maybeEmpty) => 
// Empty source is discarded
+              case _                                                        => 
attachAndMaterializeSource(source)
+            }
+          case _ => attachAndMaterializeSource(source)
+        }
+      }
+
+      private def addSingleSource(single: SingleSource[T]): Unit = {
+        if (isAvailable(out) && queue.isEmpty) {
+          push(out, single.elem)
+        } else {
+          queue.enqueue(single)
+          pendingSingleSources += 1
+        }
+      }
+
+      private def addInflightSource(inflight: InflightSource[T]): Unit = {
+        if (isAvailable(out) && queue.isEmpty) {
+          push(out, inflight.next())
+          if (!inflight.isClosed) {
+            queue.enqueue(inflight)
+            pendingInflightSources += 1
+          }
+        } else {
+          queue.enqueue(inflight)
+          pendingInflightSources += 1
         }
       }
 
+      private def addInflightIteratorSource(iterator: Iterator[T]): Unit =
+        if (iterator.hasNext) addInflightSource(new 
InflightIteratorSource[T](iterator))
+
+      private def addInflightRangeSource(range: immutable.Range): Unit =
+        if (range.nonEmpty) addInflightSource(new 
InflightRangeSource[T](range))
+
+      private def addInflightRepeatSource(elem: T): Unit =
+        addInflightSource(new InflightRepeatSource[T](elem))
+
+      private def addInflightJavaStreamSource(javaStream: JavaStreamSource[T, 
_]): Unit = {
+        val inflight = new InflightJavaStreamSource[T](javaStream.open)
+        if (inflight.hasNext) addInflightSource(inflight)
+      }
+
+      private def addCompletedFutureElem(elem: Try[T]): Unit = elem match {
+        // DO NOT CHANGE
+        // WHY: GraphStages.FutureSource treats Success(null) as 
completion-without-element
+        // (see FutureSource.handle: `case Success(null) => completeStage()`). 
The fast path
+        // here MUST mirror that — pushing null would violate Reactive 
Streams' no-null rule
+        // and diverge from the materialized FutureSource behaviour.
+        // How to apply: keep this branch in sync with FutureSource semantics; 
if FutureSource
+        // ever changes how it treats Success(null), update here too.
+        case Success(null)  => // empty inner source: discard, slot is freed 
by caller
+        case Success(value) =>
+          if (isAvailable(out) && queue.isEmpty) {
+            push(out, value)
+          } else {
+            queue.enqueue(new InflightCompletedFutureSource[T](elem))
+            pendingInflightSources += 1
+          }
+        case Failure(ex) => failStage(ex)
+      }
+
+      private def addPendingFutureElem(future: Future[T]): Unit = {
+        val inflightSource = new InflightPendingFutureSource[T](invokeCb)
+        
future.onComplete(inflightSource)(scala.concurrent.ExecutionContext.parasitic)
+        // Future is not yet ready; occupy a breadth slot until completion
+        pendingInflightSources += 1
+      }
+
+      private def attachAndMaterializeSource(source: Graph[SourceShape[T], 
M]): Unit = {
+        val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
+        sinkIn.setHandler(new InHandler {
+          override def onPush(): Unit = {
+            if (isAvailable(out)) {
+              push(out, sinkIn.grab())
+              sinkIn.pull()
+            } else {
+              queue.enqueue(sinkIn)
+            }
+          }
+          override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) 
removeSource(sinkIn)
+        })
+        sinkIn.pull()
+        sources += sinkIn
+        val graph = Source.fromGraph(source).to(sinkIn.sink)
+        interpreter.subFusingMaterializer.materialize(graph, defaultAttributes 
= enclosingAttributes)
+      }
+
       def removeSource(src: AnyRef): Unit = {
         val pullSuppressed = activeSources == breadth
         src match {
@@ -137,13 +238,26 @@ import pekko.util.OptionVal
             sources -= sub
           case _: SingleSource[_] =>
             pendingSingleSources -= 1
+          case _: InflightSource[_] =>
+            pendingInflightSources -= 1
           case other => throw new IllegalArgumentException(s"Unexpected source 
type: '${other.getClass}'")
         }
         if (pullSuppressed) tryPull(in)
         if (activeSources == 0 && isClosed(in)) completeStage()
       }
 
-      override def postStop(): Unit = sources.foreach(_.cancel())
+      override def postStop(): Unit = {
+        sources.foreach(_.cancel())
+        // Cancel any queued inflight sources so close-sensitive resources 
(e.g. JavaStream
+        // backed by IO) are released even if downstream cancels mid-flight.
+        while (queue.nonEmpty) {
+          queue.dequeue() match {
+            case inflight: InflightSource[T] @unchecked =>
+              
inflight.cancel(SubscriptionWithCancelException.NoMoreElementsNeeded)
+            case _ => // SubSinkInlet already cancelled above; SingleSource 
needs no cleanup
+          }
+        }
+      }
 
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to