This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 415c4ae760 optimize: extend internalConcat dispatch for
value-presented sources (#2978)
415c4ae760 is described below
commit 415c4ae76024b2e4d1873efc9c028c9ea838757a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Thu May 21 07:02:43 2026 +0800
optimize: extend internalConcat dispatch for value-presented sources (#2978)
* optimize: extend internalConcat dispatch for value-presented sources
Motivation:
PR #2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the lazy `concat` operator, where the second source is often a
`Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.
`concat` (the eager / detached variant) intentionally retains the
substream-materializing path so that `Concat(_, detachedInputs = true)`
+ `Detacher` semantics — eager pull at materialization, one-element
prefetch buffer, deadlock-breaking for cyclic graphs — are preserved.
Modification:
- `Flow.internalConcat` (Scala DSL): when `detached = false` (i.e.
`concatLazy`) and `TraversalBuilder.getValuePresentedSource` recognises
a known kind, fuse with a dedicated lightweight stage instead of
`Concat`. When `detached = true` (i.e. `concat`), fall through to
`concatGraph` to keep the eager-pull / prefetch / cycle-deadlock
contract intact. New stages live under `pekko.stream.impl`:
`SingleConcat`, `IterableConcat`, `JavaStreamConcat`, `RepeatConcat`,
`FailedConcat`, `FutureConcat`. `Source.empty` short-circuits to the
upstream Source unchanged. The inlined source's wrapping attributes
(from `Source.foo(...).withAttributes(...)`) are carried over to the
optimized stage via `addAttributes(other.traversalBuilder.attributes)`
so any user-supplied `SupervisionStrategy`, dispatcher hint, log level,
or stage name still applies on the fast path.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
`FailedSource.preStart` semantics. The existing `concatGraph` path
materialises `FailedSource` as a substream that fails immediately and
`Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
eager-failure timing avoids hangs on inputs that never complete (e.g.
`Source.never.concatLazy(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
match `JavaStreamSource.preStart`. Side effects of `open()` (file /
network resource acquisition) and any exceptions from `open()` happen
at materialization time, matching the existing `concatGraph` path. The
stream is closed in `postStop` so cancellation, exhaustion, iterator
failure, and stage failure all release the resource (`stream` typed
`BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
#2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
`FutureSource.preStart`: a pending-future failure surfaces eagerly via
`failStage(ex)` even while upstream is still active (otherwise
`Source.never.concatLazy(Source.future(failingFuture))` would hang). A
successful value is buffered in `futureResult` and emitted only after
`onUpstreamFinish`, preserving concat ordering. `Future.value` is used
for the already-completed fast path. The pending path swaps the `out`
handler to a no-op while waiting for the callback so we don't pull the
now-closed `in`. `Success(null)` calls `completeStage()` (no null
forwarded), staying in lock-step with
`Source.future(Future.successful(null))`
rerouting to `Source.empty` and `InflightSources.hasFutureElement`
treating `Success(null)` as no-element. `DO NOT CHANGE` comments
capture both invariants.
- `IterableConcat` reads
`inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider`
and applies it on iterator-creation / iterator-iteration failure,
mirroring `IterableSource` / `IteratorSource` semantics. Without this,
a Resume / Restart decider attached to the inlined source via
`withAttributes(supervisionStrategy(...))` would silently degrade to
Stop on the optimized path. Restart re-invokes the iterator factory and
resumes emission with a fresh iterator.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
`IterableConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`,
`FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on
both exhaustion and downstream cancellation (`take(2)` mid-stream),
pending future resolved with `Success(null)` treated as completion,
pending future resolved with `Failure(_)` failing the stream, eager
failure for `Source.never.concatLazy(Source.failed(ex))`, eager failure
for `Source.never.concatLazy(Source.future(pendingPromise))` when the
promise later fails, supervision parity for the `IterableConcat` fast
path (Resume-skips, Restart-recreates, default-Stop-fails), and the
detached-gating: a directional test asserts the fast path is not taken
for `concat` (detached = true) and another asserts that `concat`
preserves Detacher's eager pull side-effect timing for an
`IteratorSource` factory while `concatLazy` defers it. Existing eager
/ concatLazy parity matrix is preserved by gating each
`pendingBuilder.toString` assertion on `!eager`.
Result:
The lazy concat operator takes the same value-presented fast path as
#2977's flatten operators while the eager / detached `concat` keeps its
documented Detacher semantics. No substream is materialized for the
inlined kinds on the lazy path; the Java-stream resource is closed
deterministically; the failed / future eager-failure timing matches the
existing concat-graph path so inputs that never complete cannot hang;
the pending-future null path is consistent with `Source.future` and the
flatten inflight path; supervision deciders attached to inlined
iterables / iterators flow through to the optimized stage; and the
detached-true contract is preserved bit-for-bit by falling through to
`concatGraph`. All FlowConcat suites pass, plus all FlattenMerge /
FlatMapConcat regression tests.
Tests: stream-tests/testOnly *FlowConcat*Spec *Concat* *FlattenMerge*
*FlatMapConcat*
References: PR #2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR #2978 (this work)
* .
---
.../pekko/stream/scaladsl/FlowConcatSpec.scala | 309 ++++++++++++++++++++-
.../apache/pekko/stream/impl/FailedConcat.scala | 60 ++++
.../apache/pekko/stream/impl/FutureConcat.scala | 100 +++++++
.../apache/pekko/stream/impl/IterableConcat.scala | 108 +++++++
.../pekko/stream/impl/JavaStreamConcat.scala | 80 ++++++
.../apache/pekko/stream/impl/RepeatConcat.scala | 59 ++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 54 +++-
7 files changed, 763 insertions(+), 7 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
index acae1ae812..2b2deb052c 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala
@@ -13,14 +13,21 @@
package org.apache.pekko.stream.scaladsl
+import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
+import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
+import scala.util.control.NoStackTrace
import org.apache.pekko
import pekko.NotUsed
+import pekko.stream.ActorAttributes.supervisionStrategy
+import pekko.stream.KillSwitches
+import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
import pekko.stream.testkit.BaseTwoStreamsSetup
import pekko.stream.testkit.TestPublisher
import pekko.stream.testkit.TestSubscriber
@@ -242,11 +249,309 @@ abstract class AbstractFlowConcatSpec extends
BaseTwoStreamsSetup {
val s2 = Source.single(2)
val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
- // avoids digging too deap into the traversal builder
- concat.traversalBuilder.pendingBuilder.toString should
include("SingleConcat(2)")
+ // avoids digging too deep into the traversal builder; fast path is
gated on detached=false.
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("SingleConcat(2)")
concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
}
+
+ "optimize iterable concat" in {
+ val s1 = Source.single(1)
+ val s2 = Source(List(2, 3, 4))
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("IterableConcat")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+ }
+
+ "optimize range concat" in {
+ val s1 = Source.single(1)
+ val s2 = Source(2 to 4)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("IterableConcat")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+ }
+
+ "optimize iterator concat" in {
+ val s1 = Source.single(1)
+ val s2 = Source.fromIterator(() => Iterator(2, 3, 4))
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("IterableConcat")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 4))
+ }
+
+ "optimize java-stream concat" in {
+ val s1 = Source.single(1)
+ val s2 = Source.fromJavaStream(() => Collections.singleton(2:
Integer).stream()).map(_.intValue())
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ // map() is not value-presented, so the optimization should not kick in
for s2 here.
+ // To exercise the optimization, build a JavaStream source whose
value-presented form survives.
+ val s2Direct = Source.fromJavaStream(() => Collections.singleton(2:
Integer).stream())
+ val concatDirect: Source[Integer, _] =
+ if (eager) Source.single[Integer](1).concat(s2Direct) else
Source.single[Integer](1).concatLazy(s2Direct)
+ if (!eager) concatDirect.traversalBuilder.pendingBuilder.toString should
include("JavaStreamConcat")
+
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+ }
+
+ "close the underlying java-stream after java-stream concat completes" in {
+ // Resource hygiene parity with InflightJavaStreamSource:
BaseStream.close()
+ // must run on stage exhaustion (mirrors postStop in JavaStreamConcat).
+ val closed = new AtomicBoolean(false)
+ val s1 = Source.single(1)
+ val s2 = Source.fromJavaStream { () =>
+ Collections.singleton(2: Integer).stream().onClose(new Runnable {
+ override def run(): Unit = closed.set(true)
+ })
+ }
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("JavaStreamConcat")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+ awaitAssert {
+ closed.get() shouldBe true
+ }
+ }
+
+ "close the underlying java-stream when downstream cancels mid java-stream
concat" in {
+ val closed = new AtomicBoolean(false)
+ val s1 = Source.single(1)
+ val s2 = Source.fromJavaStream { () =>
+ java.util.stream.IntStream.rangeClosed(2,
1000).boxed().asInstanceOf[java.util.stream.Stream[Integer]]
+ .onClose(new Runnable {
+ override def run(): Unit = closed.set(true)
+ })
+ }
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("JavaStreamConcat")
+ // take(2) only consumes the upstream element + first element of the
java-stream, then cancels.
+ concat.take(2).runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+ // postStop runs asynchronously after take(2)'s cancel propagates
upstream.
+ awaitAssert {
+ closed.get() shouldBe true
+ }
+ }
+
+ "close the underlying java-stream when iterator.next() throws in
java-stream concat" in {
+ // Resource hygiene parity with InflightJavaStreamSource: even when user
code
+ // in iterator.next() throws, postStop must close the underlying
BaseStream.
+ val closed = new AtomicBoolean(false)
+ val ex = new RuntimeException("iterator-boom") with NoStackTrace
+ val s1 = Source.single(1)
+ val s2 = Source.fromJavaStream { () =>
+ // Build a stream whose iterator.next() throws on the first call.
+ java.util.stream.Stream.of[Integer](2: Integer).map[Integer](new
java.util.function.Function[Integer, Integer] {
+ override def apply(t: Integer): Integer = throw ex
+ }).onClose(new Runnable {
+ override def run(): Unit = closed.set(true)
+ })
+ }
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("JavaStreamConcat")
+ concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+ awaitAssert {
+ closed.get() shouldBe true
+ }
+ }
+
+ "optimize repeat concat" in {
+ val s1 = Source(1 to 3)
+ val s2 = Source.repeat(0)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("RepeatConcat(0)")
+ concat.take(6).runWith(Sink.seq).futureValue should ===(Seq(1, 2, 3, 0,
0, 0))
+ }
+
+ "optimize failed concat" in {
+ val ex = new RuntimeException("boom") with NoStackTrace
+ val s1 = Source.single(1)
+ val s2: Source[Int, NotUsed] = Source.failed(ex)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FailedConcat")
+ concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+ }
+
+ "optimize completed-future concat" in {
+ // `Source.future(Future.successful(x))` is itself optimized to a
`SingleSource`,
+ // so the dispatch lands on `SingleConcat` rather than `FutureConcat`.
+ val s1 = Source.single(1)
+ val s2 = Source.future(Future.successful(2))
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("SingleConcat(2)")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(1, 2))
+ }
+
+ "optimize pending-future concat" in {
+ val promise = Promise[Int]()
+ val s1 = Source.single(1)
+ val s2 = Source.future(promise.future)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FutureConcat")
+ val resultF = concat.runWith(Sink.seq)
+ promise.success(2)
+ resultF.futureValue should ===(Seq(1, 2))
+ }
+
+ "treat pending-future resolved with null as completion in concat" in {
+ // Lockstep with `Source.future(Future.successful(null))` (rerouted to
`Source.empty`)
+ // and `InflightSources.hasFutureElement`: a pending future that later
resolves
+ // with `null` must be treated as completion, never emitted.
+ val promise = Promise[String]()
+ val s1 = Source.single("a")
+ val s2 = Source.future(promise.future)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FutureConcat")
+ val resultF = concat.runWith(Sink.seq)
+ promise.success(null)
+ resultF.futureValue should ===(Seq("a"))
+ }
+
+ "fail the stream when pending-future resolves with failure in concat" in {
+ val ex = new RuntimeException("pending-future-boom") with NoStackTrace
+ val promise = Promise[Int]()
+ val s1 = Source.single(1)
+ val s2 = Source.future(promise.future)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FutureConcat")
+ val resultF = concat.runWith(Sink.seq)
+ promise.failure(ex)
+ resultF.failed.futureValue should ===(ex)
+ }
+
+ "optimize failed-future concat" in {
+ // `Source.future(Future.failed(ex))` is itself optimized to a
`FailedSource`,
+ // so the dispatch lands on `FailedConcat` rather than `FutureConcat`.
+ val ex = new RuntimeException("future-boom") with NoStackTrace
+ val s1 = Source.single(1)
+ val s2 = Source.future(Future.failed[Int](ex))
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FailedConcat")
+ concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+ }
+
+ "fail eagerly when concat with a failed source even if upstream never
finishes" in {
+ // Mirrors `FailedSource.preStart` semantics: the failure must surface
at materialization
+ // time, otherwise `Source.never.concat(Source.failed(ex))` would hang.
+ val ex = new RuntimeException("failed-source-eager") with NoStackTrace
+ val concat = if (eager) Source.never[Int].concat(Source.failed(ex))
+ else Source.never[Int].concatLazy(Source.failed(ex))
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FailedConcat")
+ concat.runWith(Sink.head).failed.futureValue should ===(ex)
+ }
+
+ "fail eagerly when concat with a pending future that fails before upstream
finishes" in {
+ // Mirrors `FutureSource.preStart` semantics: a pending-future failure
must propagate
+ // even if upstream is still active. Without eager failure, a
never-completing upstream
+ // combined with a failing future would hang.
+ val ex = new RuntimeException("pending-future-eager") with NoStackTrace
+ val promise = Promise[Int]()
+ val concat = if (eager)
Source.never[Int].concat(Source.future(promise.future))
+ else Source.never[Int].concatLazy(Source.future(promise.future))
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("FutureConcat")
+ val resultF = concat.runWith(Sink.head)
+ promise.failure(ex)
+ resultF.failed.futureValue should ===(ex)
+ }
+
+ "honor supervision Resume decider when the inlined iterator throws" in {
+ // Mirrors `IteratorSource.handleIteratorFailure`: a Resume decider must
skip the
+ // throwing element and continue, instead of failing the stage. Drives
the IterableConcat
+ // fast path for
`Source.fromIterator(...).withAttributes(supervisionStrategy(...))`.
+ val ex = new RuntimeException("iter-resume-3") with NoStackTrace
+ val s2 = Source
+ .fromIterator(() => Iterator(1, 2, 3, 4, 5).map(v => if (v == 3) throw
ex else v))
+ .withAttributes(supervisionStrategy(resumingDecider))
+ val concat = if (eager) Source.single(0).concat(s2) else
Source.single(0).concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("IterableConcat")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(0, 1, 2, 4, 5))
+ }
+
+ "honor supervision Restart decider by recreating the iterator after throw"
in {
+ // Mirrors `IteratorSource.handleIteratorFailure` with Restart: on
iterator failure,
+ // the iterator factory is invoked again and emission continues with the
fresh iterator.
+ val ex = new RuntimeException("iter-restart") with NoStackTrace
+ val attempt = new AtomicInteger(0)
+ val factory: () => Iterator[Int] = () => {
+ val n = attempt.incrementAndGet()
+ if (n == 1) Iterator(1, 2, 3).map(v => if (v == 2) throw ex else v)
+ else Iterator(4, 5)
+ }
+ val s2 =
Source.fromIterator(factory).withAttributes(supervisionStrategy(restartingDecider))
+ val concat = if (eager) Source.single(0).concat(s2) else
Source.single(0).concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("IterableConcat")
+ concat.runWith(Sink.seq).futureValue should ===(Seq(0, 1, 4, 5))
+ }
+
+ "fail when the inlined iterator throws and the default (Stop) decider
applies" in {
+ // Default decider is `stoppingDecider`; without an explicit
`withAttributes`, an
+ // iterator throw must fail the concat stream — same as `IterableSource`
default behaviour.
+ val ex = new RuntimeException("iter-stop") with NoStackTrace
+ val s2 = Source.fromIterator(() => Iterator(1, 2, 3).map(v => if (v ==
2) throw ex else v))
+ val concat = if (eager) Source.single(0).concat(s2) else
Source.single(0).concatLazy(s2)
+
+ if (!eager) concat.traversalBuilder.pendingBuilder.toString should
include("IterableConcat")
+ concat.runWith(Sink.seq).failed.futureValue should ===(ex)
+ }
+
+ "gate the value-presented fast path on detached=false (concatLazy only)"
in {
+ // The fast path is intentionally bypassed for `concat` (detached =
true) so that the
+ // `Concat(_, detachedInputs = true)` + `Detacher` semantics — eager
pull at
+ // materialization, one-element prefetch buffer, deadlock-breaking for
cyclic graphs —
+ // remain in force. `concatLazy` (detached = false) does take the
inlined fast path.
+ val s1 = Source.single(1)
+ val s2 = Source(List(2, 3))
+ val eagerStr = s1.concat(s2).traversalBuilder.pendingBuilder.toString
+ val lazyStr = s1.concatLazy(s2).traversalBuilder.pendingBuilder.toString
+ (eagerStr should not).include("IterableConcat")
+ lazyStr should include("IterableConcat")
+ }
+
+ "preserve `concat` (detached=true) eager-pull side-effect timing for
IteratorSource" in {
+ // `Detacher.preStart` does `tryPull(in)` which causes
IteratorSource.onPull to invoke
+ // the user-supplied factory at materialization time, even if the LHS
upstream never
+ // finishes. The fast path inlines IteratorSource into IterableConcat
which delays the
+ // factory call until upstream finishes; gating on detached=false keeps
detached
+ // semantics intact for `concat`.
+ val factoryRan = Promise[Unit]()
+ val factory: () => Iterator[Int] = () => {
+ factoryRan.trySuccess(())
+ Iterator.empty[Int]
+ }
+ val s1 = Source.never[Int]
+ val s2 = Source.fromIterator(factory)
+ val concat = if (eager) s1.concat(s2) else s1.concatLazy(s2)
+ val (killSwitch, _) =
+
concat.viaMat(KillSwitches.single)(Keep.right).toMat(Sink.ignore)(Keep.both).run()
+ try {
+ if (eager) {
+ // Factory must run at materialization (Detacher's eager pull).
+ factoryRan.future.futureValue
+ } else {
+ // Factory must NOT run while LHS (Source.never) hasn't completed.
+ try {
+ Await.ready(factoryRan.future, 200.millis)
+ fail("Factory unexpectedly ran with concatLazy")
+ } catch {
+ case _: java.util.concurrent.TimeoutException => () // expected
+ }
+ }
+ } finally {
+ killSwitch.shutdown()
+ }
+ }
}
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
new file mode 100644
index 0000000000..00efb5f40d
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
+
+/**
+ * Concatenating a `Source.failed` to a stream is common enough that it
warrants this
+ * optimization which avoids the actual fan-out for such cases. Mirrors
`FailedSource`
+ * timing semantics: `preStart()` calls `failStage(failure)`, so the failure
surfaces
+ * at materialization time even if upstream is still active (e.g.
`Source.never`).
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class FailedConcat[E](failure: Throwable) extends
GraphStage[FlowShape[E, E]] {
+
+ val in = Inlet[E]("FailedConcat.in")
+ val out = Outlet[E]("FailedConcat.out")
+
+ override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ // DO NOT CHANGE
+ // WHY: matches FailedSource.preStart which fails at materialization
time. The
+ // existing concatGraph path materializes FailedSource as a substream
whose
+ // preStart fails immediately and Concat's onUpstreamFailure propagates
it
+ // eagerly. Keeping the same eager-failure timing avoids hangs for
inputs that
+ // never complete (e.g. Source.never.concat(Source.failed(ex))).
+ override def preStart(): Unit = failStage(failure)
+
+ // Handlers are required by GraphStageLogic; preStart fails the stage
before
+ // any of these can fire.
+ override def onPush(): Unit = ()
+ override def onPull(): Unit = ()
+
+ setHandlers(in, out, this)
+ }
+
+ override def toString: String = s"FailedConcat($failure)"
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
new file mode 100644
index 0000000000..509fb1cf21
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success, Try }
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
+
+/**
+ * Concatenating a `Source.future` to a stream is common enough that it
warrants this
+ * optimization which avoids the actual fan-out for such cases. Mirrors
`FutureSource`
+ * timing semantics: the future callback is registered in `preStart()` (so a
Failure
+ * surfaces eagerly even while upstream is still active), while a successful
value is
+ * buffered and emitted only after upstream finishes — preserving concat
ordering.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class FutureConcat[E](future: Future[E]) extends
GraphStage[FlowShape[E, E]] {
+
+ val in = Inlet[E]("FutureConcat.in")
+ val out = Outlet[E]("FutureConcat.out")
+
+ override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ // null sentinel = not yet resolved (cheaper than Option allocation per
stage)
+ private var futureResult: Try[E] = null
+ private var upstreamFinished: Boolean = false
+
+ // DO NOT CHANGE
+ // WHY: matches FutureSource.preStart timing. A pending future failure
must
+ // surface at materialization time, not after upstream completes —
otherwise
+ // `Source.never.concat(Source.future(failingFuture))` would hang.
Successful
+ // values are buffered until upstream finishes to preserve concat
ordering.
+ override def preStart(): Unit =
+ future.value match {
+ case Some(Failure(ex)) => failStage(ex)
+ case completed @ Some(_) => futureResult = completed.get
+ case None =>
+ val cb = getAsyncCallback[Try[E]] {
+ case Failure(ex) => failStage(ex)
+ case other =>
+ futureResult = other
+ if (upstreamFinished) emitOrComplete(other)
+ }.invoke _
+ future.onComplete(cb)(ExecutionContext.parasitic)
+ }
+
+ override def onPush(): Unit = push(out, grab(in))
+
+ override def onPull(): Unit = pull(in)
+
+ override def onUpstreamFinish(): Unit = {
+ upstreamFinished = true
+ if (futureResult ne null) emitOrComplete(futureResult)
+ else
+ // Avoid pulling the now-closed `in` while the future is pending; the
+ // async callback above will emit/fail when it fires.
+ setHandler(out, new OutHandler { override def onPull(): Unit = () })
+ }
+
+ private def emitOrComplete(result: Try[E]): Unit = result match {
+ // DO NOT CHANGE
+ // WHY: Source.future(Future.successful(null)) is rerouted upstream to
Source.empty,
+ // and InflightSources.hasFutureElement treats Success(null) as
no-element. This
+ // branch keeps the optimized concat fast-path consistent with both —
emitting
+ // null here would violate Reactive Streams' no-null rule and diverge
from the
+ // materialized FutureSource path. Update in lock-step if
Source.future ever
+ // changes its null treatment.
+ case Success(null) => completeStage()
+ case Success(v) => emit(out, v, () => completeStage())
+ case Failure(ex) => failStage(ex)
+ }
+
+ setHandlers(in, out, this)
+ }
+
+ override def toString: String = "FutureConcat"
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
new file mode 100644
index 0000000000..ddaeddc933
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, Supervision }
+import pekko.stream.ActorAttributes.SupervisionStrategy
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
+
+/**
+ * Concatenating an iterable / iterator / range source to a stream is common
enough
+ * that it warrants this optimization which avoids the actual fan-out for
value-presented
+ * sources. Java-stream-backed sources are handled by [[JavaStreamConcat]]
because they
+ * also need deterministic `BaseStream.close()` on stage termination.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class IterableConcat[E](createIterator: () =>
Iterator[E]) extends GraphStage[FlowShape[E, E]] {
+
+ val in = Inlet[E]("IterableConcat.in")
+ val out = Outlet[E]("IterableConcat.out")
+
+ override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ // DO NOT CHANGE
+ // WHY: matches `IterableSource` / `IteratorSource` supervision
semantics so
+ // `Source.iterable /
fromIterator(...).withAttributes(supervisionStrategy(...))`
+ // behaves identically whether dispatched through this fast path or
through the
+ // substream-materializing concatGraph path. Without this, a Resume /
Restart
+ // decider attached to the inlined source would silently degrade to Stop.
+ private lazy val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private var currentIterator: Iterator[E] = _
+ private var iterating: Boolean = false
+
+ override def onPush(): Unit = push(out, grab(in))
+
+ override def onPull(): Unit =
+ if (!iterating) pull(in)
+ else tryPushNextOrComplete()
+
+ override def onUpstreamFinish(): Unit = {
+ iterating = true
+ if (isAvailable(out)) tryPushNextOrComplete()
+ }
+
+ private def tryPushNextOrComplete(): Unit =
+ try {
+ if (currentIterator eq null) currentIterator = createIterator()
+ if (currentIterator.hasNext) {
+ if (isAvailable(out)) {
+ push(out, currentIterator.next())
+ if (!currentIterator.hasNext) completeStage()
+ }
+ } else {
+ completeStage()
+ }
+ } catch {
+ case NonFatal(ex) =>
+ if (currentIterator eq null) handleIteratorCreationFailure(ex)
+ else handleIteratorFailure(ex)
+ }
+
+ private def handleIteratorCreationFailure(ex: Throwable): Unit =
decider(ex) match {
+ case Supervision.Stop => failStage(ex)
+ case Supervision.Resume => completeStage()
+ case Supervision.Restart =>
+ try {
+ currentIterator = createIterator()
+ tryPushNextOrComplete()
+ } catch {
+ case NonFatal(restartEx) => failStage(restartEx)
+ }
+ }
+
+ private def handleIteratorFailure(ex: Throwable): Unit = decider(ex)
match {
+ case Supervision.Stop => failStage(ex)
+ case Supervision.Resume => tryPushNextOrComplete()
+ case Supervision.Restart =>
+ currentIterator = createIterator()
+ tryPushNextOrComplete()
+ }
+
+ setHandlers(in, out, this)
+ }
+
+ override def toString: String = "IterableConcat"
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala
new file mode 100644
index 0000000000..7b399b43a5
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
+
+/**
+ * Concatenating a `Source.fromJavaStream` to a stream is common enough that
it warrants
+ * this optimization which avoids the actual fan-out for such cases. Mirrors
the
+ * resource-management contract of [[JavaStreamSource]]: the underlying
`BaseStream` is
+ * opened in `preStart` (so any side effects / exceptions from `open()` happen
at
+ * materialization time, matching the existing concatGraph path) and is
guaranteed to
+ * be closed via `postStop` on exhaustion, on iterator failure, on downstream
cancel,
+ * and on stage failure.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class JavaStreamConcat[E](open: () =>
java.util.stream.BaseStream[E, _])
+ extends GraphStage[FlowShape[E, E]] {
+
+ val in = Inlet[E]("JavaStreamConcat.in")
+ val out = Outlet[E]("JavaStreamConcat.out")
+
+ override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ private var stream: java.util.stream.BaseStream[E, _] = _
+ private var iterator: Iterator[E] = _
+
+ // DO NOT CHANGE
+ // WHY: matches JavaStreamSource.preStart which opens the stream at
materialization
+ // time. Side effects of `open()` (e.g. file/network resource
acquisition) and any
+ // exceptions it throws happen at the same point as the existing
concatGraph path.
+ // Deferring open() to onUpstreamFinish would observably delay these
effects.
+ override def preStart(): Unit = {
+ stream = open()
+ iterator = stream.iterator().asScala
+ }
+
+ override def onPush(): Unit = push(out, grab(in))
+
+ override def onPull(): Unit = pull(in)
+
+ override def onUpstreamFinish(): Unit =
+ emitMultiple(out, iterator, () => completeStage())
+
+ override def postStop(): Unit =
+ if (stream ne null) {
+ try stream.close()
+ catch { case NonFatal(_) => () }
+ }
+
+ setHandlers(in, out, this)
+ }
+
+ override def toString: String = "JavaStreamConcat"
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
new file mode 100644
index 0000000000..2ea912e0a1
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler
}
+
+/**
+ * Concatenating a `Source.repeat` to a stream is common enough that it
warrants this
+ * optimization which avoids the actual fan-out for such cases. After upstream
+ * finishes, the stage indefinitely emits the cached element on every pull
until
+ * downstream cancels.
+ *
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] final class RepeatConcat[E](elem: E) extends
GraphStage[FlowShape[E, E]] {
+
+ val in = Inlet[E]("RepeatConcat.in")
+ val out = Outlet[E]("RepeatConcat.out")
+
+ override val shape: FlowShape[E, E] = FlowShape(in, out)
+
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with InHandler with OutHandler {
+ override def onPush(): Unit = push(out, grab(in))
+
+ override def onPull(): Unit = pull(in)
+
+ override def onUpstreamFinish(): Unit = {
+ setHandler(out,
+ new OutHandler {
+ override def onPull(): Unit = push(out, elem)
+ })
+ if (isAvailable(out)) push(out, elem)
+ }
+
+ setHandlers(in, out, this)
+ }
+
+ override def toString: String = s"RepeatConcat($elem)"
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index 6e498d1ca5..c670c8c76a 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -32,8 +32,15 @@ import pekko.event.LoggingAdapter
import pekko.event.MarkerLoggingAdapter
import pekko.stream._
import pekko.stream.Attributes.SourceLocation
+import pekko.stream.impl.FailedConcat
+import pekko.stream.impl.FailedSource
+import pekko.stream.impl.FutureConcat
+import pekko.stream.impl.IterableConcat
+import pekko.stream.impl.JavaStreamConcat
+import pekko.stream.impl.JavaStreamSource
import pekko.stream.impl.LinearTraversalBuilder
import pekko.stream.impl.ProcessorModule
+import pekko.stream.impl.RepeatConcat
import pekko.stream.impl.SetupFlowStage
import pekko.stream.impl.SingleConcat
import pekko.stream.impl.Stages.DefaultAttributes
@@ -44,6 +51,7 @@ import pekko.stream.impl.TraversalBuilder
import pekko.stream.impl.fusing
import pekko.stream.impl.fusing._
import pekko.stream.impl.fusing.FlattenMerge
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, RepeatSource,
SingleSource }
import pekko.stream.stage._
import pekko.util.ConstantFun
import pekko.util.OptionVal
@@ -3808,11 +3816,47 @@ trait FlowOps[+Out, +Mat] {
that match {
case source if TraversalBuilder.isEmptySource(source) =>
this.asInstanceOf[Repr[U]]
case other =>
- TraversalBuilder.getSingleSource(other) match {
- case OptionVal.Some(singleSource) =>
- via(new SingleConcat(singleSource.elem.asInstanceOf[U]))
- case _ => via(concatGraph(other, detached))
- }
+ // DO NOT CHANGE
+ // WHY: only `concatLazy` (detached = false) takes the inlined fast
path.
+ // `concat` (detached = true) relies on `Concat(_, detachedInputs =
true)` which
+ // wraps each input port with `Detacher`. Detacher.preStart does an
eager
+ // `tryPull(in)`, which (a) pulls the LHS upstream at materialization,
(b) pulls
+ // the RHS source eagerly so its first element is produced and
buffered before
+ // the LHS finishes, and (c) provides a one-element buffer that
decouples
+ // upstream/downstream demand (also breaks deadlocks in cyclic graphs).
+ // Inlining the RHS into a single linear stage drops both eager-pull
side-effect
+ // timing and the buffer, so for `detached = true` we keep the
substream path.
+ if (!detached) {
+ TraversalBuilder.getValuePresentedSource(other) match {
+ case OptionVal.Some(graph) =>
+ // Attributes attached to the inlined source via
`withAttributes` (notably
+ // `SupervisionStrategy`, dispatcher hints, log levels) are
carried over to
+ // the optimized stage so the value-presented fast path stays
behaviourally
+ // identical to the substream-materializing concatGraph path.
+ val sourceAttrs = other.traversalBuilder.attributes
+ graph match {
+ case single: SingleSource[U] @unchecked =>
+ via(new SingleConcat(single.elem).addAttributes(sourceAttrs))
+ case iterable: IterableSource[U] @unchecked =>
+ via(new IterableConcat[U](() =>
iterable.elements.iterator).addAttributes(sourceAttrs))
+ case iterator: IteratorSource[U] @unchecked =>
+ via(new
IterableConcat[U](iterator.createIterator).addAttributes(sourceAttrs))
+ case range: RangeSource[U] @unchecked =>
+ via(new IterableConcat[U](() =>
range.range.iterator.asInstanceOf[Iterator[U]]).addAttributes(
+ sourceAttrs))
+ case javaStream: JavaStreamSource[U, _] @unchecked =>
+ via(new
JavaStreamConcat[U](javaStream.open).addAttributes(sourceAttrs))
+ case repeat: RepeatSource[U] @unchecked =>
+ via(new
RepeatConcat[U](repeat.elem).addAttributes(sourceAttrs))
+ case futureSource: FutureSource[U] @unchecked =>
+ via(new
FutureConcat[U](futureSource.future).addAttributes(sourceAttrs))
+ case failed: FailedSource[U] @unchecked =>
+ via(new
FailedConcat[U](failed.failure).addAttributes(sourceAttrs))
+ case _ => via(concatGraph(other, detached))
+ }
+ case _ => via(concatGraph(other, detached))
+ }
+ } else via(concatGraph(other, detached))
}
private def internalConcatAll[U >: Out](those: Array[Graph[SourceShape[U],
_]], detached: Boolean): Repr[U] =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]