This is an automated email from the ASF dual-hosted git repository.

gregm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 66d7dc1946 Add Sink.eagerFutureSink to avoid 
NeverMaterializedException on empty streams (#2684)
66d7dc1946 is described below

commit 66d7dc19464c1e082e759f0595cd8976249b7b00
Author: Greg Methvin <[email protected]>
AuthorDate: Tue Mar 10 12:03:47 2026 -0700

    Add Sink.eagerFutureSink to avoid NeverMaterializedException on empty 
streams (#2684)
---
 .../stream/operators/Sink/completionStageSink.md   |   8 +
 .../operators/Sink/eagerCompletionStageSink.md     |  33 ++++
 .../stream/operators/Sink/eagerFutureSink.md       |  37 ++++
 .../paradox/stream/operators/Sink/futureSink.md    |   6 +
 .../operators/Sink/lazyCompletionStageSink.md      |   5 +-
 .../stream/operators/Sink/lazyFutureSink.md        |   4 +-
 docs/src/main/paradox/stream/operators/index.md    |   4 +
 .../pekko/stream/DslFactoriesConsistencySpec.scala |   1 +
 .../stream/scaladsl/EagerFutureSinkSpec.scala      | 193 +++++++++++++++++++++
 .../scala/org/apache/pekko/stream/impl/Sinks.scala | 128 ++++++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |   1 +
 .../org/apache/pekko/stream/javadsl/Sink.scala     |  20 +++
 .../org/apache/pekko/stream/scaladsl/Sink.scala    |  18 ++
 13 files changed, 456 insertions(+), 2 deletions(-)

diff --git a/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md 
b/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
index 186c187e6d..16a1b2f5eb 100644
--- a/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/completionStageSink.md
@@ -10,6 +10,14 @@ Streams the elements to the given future sink once it 
successfully completes.
 Streams the elements through the given future flow once it successfully 
completes. 
 If the future fails the stream is failed.
 
+`completionStageSink` uses the same lazy materialization semantics as
+@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md): the nested sink is 
not materialized until the first
+upstream element arrives. If the stream completes before the first element, 
the materialized value fails with
+`org.apache.pekko.stream.NeverMaterializedException`.
+
+If you want this to work for empty streams as well, use
+@ref:[eagerCompletionStageSink](eagerCompletionStageSink.md).
+
 ## Reactive Streams semantics
 
 @@@div { .callout }
diff --git 
a/docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md 
b/docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md
new file mode 100644
index 0000000000..cc31625894
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/eagerCompletionStageSink.md
@@ -0,0 +1,33 @@
+# Sink.eagerCompletionStageSink
+
+Materializes the inner sink when the future completes, even if no elements 
have arrived yet.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+
+## Description
+
+Turn a `CompletionStage<Sink>` into a Sink that will consume the values of the 
source when the future completes
+successfully. If the `CompletionStage` is completed with a failure the stream 
is failed.
+
+Unlike @ref:[completionStageSink](completionStageSink.md) and 
@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md), this operator 
materializes the inner sink as soon as the future
+completes, even if no elements have arrived yet. This means empty streams 
complete normally rather than failing
+with `NeverMaterializedException`. At most one element that arrives before the 
future completes is buffered.
+
+The materialized future value is completed with the materialized value of the 
inner sink once it has been
+materialized, or failed if the `CompletionStage` itself fails or if 
materialization of the inner sink fails.
+Upstream failures or downstream cancellations that occur before the inner sink 
is materialized are propagated
+through the inner sink rather than failing the materialized value directly.
+
+See also @ref:[completionStageSink](completionStageSink.md), 
@ref:[lazyCompletionStageSink](lazyCompletionStageSink.md).
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** if the future fails or if the created sink cancels 
+
+**backpressures** when initialized and when created sink backpressures
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md 
b/docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md
new file mode 100644
index 0000000000..e481ff0044
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/eagerFutureSink.md
@@ -0,0 +1,37 @@
+# Sink.eagerFutureSink
+
+Materializes the inner sink when the future completes, even if no elements 
have arrived yet.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.eagerFutureSink](Sink$) { 
scala="#eagerFutureSink[T,M](future:scala.concurrent.Future[org.apache.pekko.stream.scaladsl.Sink[T,M]]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]"
 }
+
+
+## Description
+
+Turn a `Future[Sink]` into a Sink that will consume the values of the source 
when the future completes
+successfully. If the `Future` is completed with a failure the stream is failed.
+
+Unlike @ref:[futureSink](futureSink.md) and 
@ref:[lazyFutureSink](lazyFutureSink.md), this operator materializes the inner 
sink as soon as the future
+completes, even if no elements have arrived yet. This means empty streams 
complete normally rather than failing
+with `NeverMaterializedException`. At most one element that arrives before the 
future completes is buffered.
+
+The materialized future value is completed with the materialized value of the 
inner sink once it has been
+materialized, or failed if the future itself fails or if materialization of 
the inner sink fails. Upstream
+failures or downstream cancellations that occur before the inner sink is 
materialized are propagated through
+the inner sink rather than failing the materialized value directly.
+
+See also @ref:[futureSink](futureSink.md), 
@ref:[lazyFutureSink](lazyFutureSink.md).
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** if the future fails or if the created sink cancels 
+
+**backpressures** when initialized and when created sink backpressures
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/Sink/futureSink.md 
b/docs/src/main/paradox/stream/operators/Sink/futureSink.md
index 59b50afeb3..45e54fab76 100644
--- a/docs/src/main/paradox/stream/operators/Sink/futureSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/futureSink.md
@@ -14,6 +14,12 @@ Streams the elements to the given future sink once it 
successfully completes.
 Streams the elements through the given future flow once it successfully 
completes. 
 If the future fails the stream is failed.
 
+`futureSink` uses the same lazy materialization semantics as 
@ref:[lazyFutureSink](lazyFutureSink.md): the nested sink
+is not materialized until the first upstream element arrives. If the stream 
completes before the first element, the
+materialized value fails with 
`org.apache.pekko.stream.NeverMaterializedException`.
+
+If you want this to work for empty streams as well, use 
@ref:[eagerFutureSink](eagerFutureSink.md).
+
 ## Reactive Streams semantics
 
 @@@div { .callout }
diff --git 
a/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md 
b/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
index 5931ab4d25..14b66218e1 100644
--- a/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md
@@ -16,7 +16,10 @@ and failed with a 
`org.apache.pekko.stream.NeverMaterializedException` if the st
 
 Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md) 
to base the sink on the first element.
 
-See also @ref:[lazySink](lazySink.md).
+If you need empty streams to complete normally, use
+@ref:[eagerCompletionStageSink](eagerCompletionStageSink.md).
+
+See also @ref:[lazySink](lazySink.md), 
@ref:[completionStageSink](completionStageSink.md).
 
 ## Reactive Streams semantics
 
diff --git a/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md 
b/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
index b905c28672..0be83b8005 100644
--- a/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
+++ b/docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md
@@ -20,7 +20,9 @@ and failed with a 
`org.apache.pekko.stream.NeverMaterializedException` if the st
 
 Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md) 
to base the sink on the first element.
 
-See also @ref:[lazySink](lazySink.md).
+If you need empty streams to complete normally, use 
@ref:[eagerFutureSink](eagerFutureSink.md).
+
+See also @ref:[lazySink](lazySink.md), @ref:[futureSink](futureSink.md).
 
 ## Reactive Streams semantics
 
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 68bc137888..f39c251bb8 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -59,6 +59,8 @@ These built-in sinks are available from 
@scala[`org.apache.pekko.stream.scaladsl
 |Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several 
sinks into one using a user specified strategy|
 |Sink|<a 
name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams
 the elements to the given future sink once it successfully completes. |
 |Sink|<a name="count"></a>@ref[count](Sink/count.md)|Counts all incoming 
elements until upstream terminates.|
+|Sink|<a 
name="eagercompletionstagesink"></a>@ref[eagerCompletionStageSink](Sink/eagerCompletionStageSink.md)|Materializes
 the inner sink when the future completes, even if no elements have arrived 
yet.|
+|Sink|<a 
name="eagerfuturesink"></a>@ref[eagerFutureSink](Sink/eagerFutureSink.md)|Materializes
 the inner sink when the future completes, even if no elements have arrived 
yet.|
 |Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will 
test the given predicate `p` for every received element and completes with the 
result.|
 |Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements 
with a function, where each invocation will get the new element and the result 
from the previous fold invocation.|
 |Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over 
emitted elements with a function, where each invocation will get the new 
element and the result from the previous fold invocation.|
@@ -455,6 +457,8 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [dropRepeated](Source-or-Flow/dropRepeated.md)
 * [dropWhile](Source-or-Flow/dropWhile.md)
 * [dropWithin](Source-or-Flow/dropWithin.md)
+* [eagerCompletionStageSink](Sink/eagerCompletionStageSink.md)
+* [eagerFutureSink](Sink/eagerFutureSink.md)
 * [empty](Source/empty.md)
 * [exists](Sink/exists.md)
 * [expand](Source-or-Flow/expand.md)
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 3481be78ea..8f9fffa155 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -41,6 +41,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with 
Matchers {
       "lazyFutureFlow", // lazyCompletionStageFlow
       "futureFlow", // completionStageFlow
       "futureSink", // completionStageSink
+      "eagerFutureSink", // eagerCompletionStageSink
       "lazyFutureSink", // lazyCompletionStageSink
       "createGraph" // renamed/overload of create for getting type inference 
working in Scala 3
     )
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala
new file mode 100644
index 0000000000..10d0374a6c
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/EagerFutureSinkSpec.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import scala.concurrent.{ Future, Promise }
+
+import org.apache.pekko
+import pekko.stream.{ AbruptStageTerminationException, Materializer }
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.Utils._
+
+class EagerFutureSinkSpec extends StreamSpec("""
+    pekko.stream.materializer.initial-input-buffer-size = 1
+    pekko.stream.materializer.max-input-buffer-size = 1
+  """) {
+
+  val ex = TE("")
+
+  "Sink.eagerFutureSink" must {
+
+    "work with an already-completed future" in {
+      val result = Source(List(1, 2, 3))
+        
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+        .run()
+        .flatten
+
+      result.futureValue shouldBe Seq(1, 2, 3)
+    }
+
+    "work when the future completes after elements arrive" in {
+      val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+      val result = Source(List(1, 2, 3))
+        .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+        .run()
+        .flatten
+
+      sinkPromise.success(Sink.seq[Int])
+      result.futureValue shouldBe Seq(1, 2, 3)
+    }
+
+    "handle an empty stream with an already-completed future" in {
+      val result = Source
+        .empty[Int]
+        
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+        .run()
+        .flatten
+
+      result.futureValue shouldBe Seq.empty
+    }
+
+    "handle an empty stream with a pending future" in {
+      val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+      val result = Source
+        .empty[Int]
+        .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+        .run()
+        .flatten
+
+      sinkPromise.success(Sink.seq[Int])
+      result.futureValue shouldBe Seq.empty
+    }
+
+    "propagate failure when the future fails" in {
+      val result = Source(List(1, 2, 3))
+        .toMat(Sink.eagerFutureSink(Future.failed[Sink[Int, 
Future[Seq[Int]]]](ex)))(Keep.right)
+        .run()
+        .flatten
+
+      result.failed.futureValue shouldBe ex
+    }
+
+    "propagate upstream failure" in {
+      val result = Source
+        .failed[Int](ex)
+        
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+        .run()
+        .flatten
+
+      result.failed.futureValue shouldBe ex
+    }
+
+    "propagate upstream failure when the future is still pending" in {
+      val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+      val result = Source
+        .failed[Int](ex)
+        .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+        .run()
+        .flatten
+
+      sinkPromise.success(Sink.seq[Int])
+      result.failed.futureValue shouldBe ex
+    }
+
+    "propagate upstream failure when element was buffered and future resolves 
later" in {
+      val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+      val result = Source(List(1))
+        .concat(Source.failed[Int](ex))
+        .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+        .run()
+        .flatten
+
+      sinkPromise.success(Sink.seq[Int])
+      result.failed.futureValue shouldBe ex
+    }
+
+    "work with Sink.fold on a non-empty stream" in {
+      val result = Source(List(1, 2, 3))
+        .toMat(Sink.eagerFutureSink(Future.successful(Sink.fold[Int, Int](0)(_ 
+ _))))(Keep.right)
+        .run()
+        .flatten
+
+      result.futureValue shouldBe 6
+    }
+
+    "work with Sink.fold on an empty stream" in {
+      val result = Source
+        .empty[Int]
+        .toMat(Sink.eagerFutureSink(Future.successful(Sink.fold[Int, Int](0)(_ 
+ _))))(Keep.right)
+        .run()
+        .flatten
+
+      result.futureValue shouldBe 0
+    }
+
+    "not throw NeverMaterializedException on empty stream (unlike futureSink)" 
in {
+      val result = Source
+        .empty[Int]
+        
.toMat(Sink.eagerFutureSink(Future.successful(Sink.seq[Int])))(Keep.right)
+        .run()
+        .flatten
+
+      result.futureValue shouldBe Seq.empty
+    }
+
+    "materialize inner sink immediately when the future is already completed 
(even with no elements yet)" in {
+      val innerMatPromise = Promise[Unit]()
+      val sink = Sink.foreach[Int](_ => ()).mapMaterializedValue(_ => 
innerMatPromise.success(()))
+      val sinkFuture = Future.successful(sink)
+
+      Source.maybe[Int]
+        .toMat(Sink.eagerFutureSink(sinkFuture))(Keep.right)
+        .run()
+
+      innerMatPromise.future.futureValue shouldBe (())
+    }
+
+    "cancel upstream when inner sink cancels" in {
+      val result = Source(List(1, 2, 3, 4, 5))
+        
.toMat(Sink.eagerFutureSink(Future.successful(Sink.head[Int])))(Keep.right)
+        .run()
+        .flatten
+
+      result.futureValue shouldBe 1
+    }
+
+    "propagate failure when the future fails late" in {
+      val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+      val result = Source(List(1, 2, 3))
+        .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+        .run()
+
+      sinkPromise.failure(ex)
+      result.failed.futureValue shouldBe ex
+    }
+
+    "fail the materialized value on abrupt termination before future 
completion" in {
+      val mat = Materializer(system)
+      val sinkPromise = Promise[Sink[Int, Future[Seq[Int]]]]()
+      val result = Source.maybe[Int]
+        .toMat(Sink.eagerFutureSink(sinkPromise.future))(Keep.right)
+        .run()(mat)
+
+      mat.shutdown()
+
+      result.failed.futureValue shouldBe an[AbruptStageTerminationException]
+    }
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
index dc2ac01da7..08eb2942f6 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
@@ -40,6 +40,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
 import pekko.stream.impl.StreamLayout.AtomicModule
 import pekko.stream.scaladsl.{ Keep, Sink, SinkQueueWithCancel, Source }
 import pekko.stream.stage._
+import pekko.util.OptionVal
 
 import org.reactivestreams.Publisher
 import org.reactivestreams.Subscriber
@@ -661,3 +662,130 @@ import org.reactivestreams.Subscriber
     (stageLogic, promise.future)
   }
 }
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that 
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike 
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and 
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future: 
Future[Sink[T, M]])
+    extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+  val in = Inlet[T]("eagerFutureSink.in")
+  override def initialAttributes = DefaultAttributes.eagerFutureSink
+  override val shape: SinkShape[T] = SinkShape.of(in)
+
+  override def toString: String = "EagerFutureSink"
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[M]) = {
+    val promise = Promise[M]()
+    val stageLogic = new GraphStageLogic(shape) with InHandler {
+      private var bufferedElement: OptionVal[T] = OptionVal.none
+      private var upstreamFailed: OptionVal[Throwable] = OptionVal.none
+      private var upstreamClosed = false
+
+      override def preStart(): Unit = {
+        pull(in)
+        val cb = getAsyncCallback[Try[Sink[T, M]]] {
+          case Success(sink) => onSinkReady(sink)
+          case Failure(e)    =>
+            promise.tryFailure(e)
+            failStage(e)
+        }
+        try {
+          future.onComplete(cb.invoke)(ExecutionContext.parasitic)
+        } catch {
+          case NonFatal(e) =>
+            promise.tryFailure(e)
+            failStage(e)
+        }
+        setKeepGoing(true)
+      }
+
+      override def onPush(): Unit = {
+        bufferedElement = OptionVal.Some(grab(in))
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        upstreamClosed = true
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = {
+        upstreamFailed = OptionVal.Some(ex)
+        upstreamClosed = true
+      }
+
+      private def onSinkReady(sink: Sink[T, M]): Unit = {
+        if (promise.isCompleted) return
+        try {
+          val subOutlet = new SubSourceOutlet[T]("EagerFutureSink")
+          val matVal = interpreter.subFusingMaterializer
+            
.materialize(Source.fromGraph(subOutlet.source).toMat(sink)(Keep.right), 
inheritedAttributes)
+          promise.trySuccess(matVal)
+
+          setHandler(
+            in,
+            new InHandler {
+              override def onPush(): Unit = subOutlet.push(grab(in))
+              override def onUpstreamFinish(): Unit = {
+                subOutlet.complete()
+                completeStage()
+              }
+              override def onUpstreamFailure(ex: Throwable): Unit = {
+                subOutlet.fail(ex)
+                failStage(ex)
+              }
+            })
+
+          subOutlet.setHandler(new OutHandler {
+            override def onPull(): Unit = {
+              bufferedElement match {
+                case OptionVal.Some(elem) =>
+                  bufferedElement = OptionVal.none
+                  subOutlet.push(elem)
+                  if (upstreamClosed) {
+                    subOutlet.complete()
+                    completeStage()
+                  }
+                case _ =>
+                  if (upstreamClosed) {
+                    subOutlet.complete()
+                    completeStage()
+                  } else if (!isClosed(in)) {
+                    pull(in)
+                  }
+              }
+            }
+            override def onDownstreamFinish(cause: Throwable): Unit = {
+              if (!isClosed(in)) cancel(in, cause)
+              completeStage()
+            }
+          })
+
+          upstreamFailed match {
+            case OptionVal.Some(ex) =>
+              subOutlet.fail(ex)
+              failStage(ex)
+            case _ =>
+              if (upstreamClosed && bufferedElement.isEmpty) {
+                subOutlet.complete()
+                setKeepGoing(false)
+              }
+          }
+        } catch {
+          case NonFatal(e) =>
+            promise.tryFailure(e)
+            failStage(e)
+        }
+      }
+
+      override def postStop(): Unit = {
+        if (!promise.isCompleted) promise.failure(new 
AbruptStageTerminationException(this))
+      }
+
+      setHandler(in, this)
+    }
+    (stageLogic, promise.future)
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 2d4bd30c98..6de53600bd 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -167,6 +167,7 @@ import pekko.stream.Attributes._
     val actorRefWithBackpressureSink = name("actorRefWithBackpressureSink")
     val queueSink = name("queueSink")
     val lazySink = name("lazySink")
+    val eagerFutureSink = name("eagerFutureSink")
     val lazyFlow = name("lazyFlow")
     val futureFlow = name("futureFlow")
     val lazySource = name("lazySource")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index bcf53dbbf0..2b23421f8c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -525,6 +525,26 @@ object Sink {
   def completionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T, 
CompletionStage[M]] =
     lazyCompletionStageSink[T, M](() => future)
 
+  /**
+   * Turn a `CompletionStage[Sink]` into a Sink that will consume the values 
of the source when the future completes
+   * successfully. If the `CompletionStage` is completed with a failure the 
stream is failed.
+   *
+   * Unlike [[completionStageSink]] and [[lazyCompletionStageSink]], this 
operator materializes the inner sink as
+   * soon as the future completes, even if no elements have arrived yet. This 
means empty streams complete normally
+   * rather than failing with [[NeverMaterializedException]]. At most one 
element that arrives before the future
+   * completes is buffered.
+   *
+   * The materialized future value is completed with the materialized value of 
the inner sink once it has been
+   * materialized, or failed if the `CompletionStage` itself fails or if 
materialization of the inner sink fails.
+   * Upstream failures or downstream cancellations that occur before the inner 
sink is materialized are propagated
+   * through the inner sink rather than failing the materialized value 
directly.
+   *
+   * @since 1.5.0
+   */
+  def eagerCompletionStageSink[T, M](future: CompletionStage[Sink[T, M]]): 
Sink[T, CompletionStage[M]] =
+    new 
Sink(scaladsl.Sink.eagerFutureSink(future.asScala.map(_.asScala)(ExecutionContext.parasitic)))
+      .mapMaterializedValue(_.asJava)
+
   /**
    * Defers invoking the `create` function to create a sink until there is a 
first element passed from upstream.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index 03f97f94f8..a81c3ba97b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -733,6 +733,24 @@ object Sink {
   def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
     lazyFutureSink[T, M](() => future)
 
+  /**
+   * Turn a `Future[Sink]` into a Sink that will consume the values of the 
source when the future completes
+   * successfully. If the `Future` is completed with a failure the stream is 
failed.
+   *
+   * Unlike [[futureSink]] and [[lazyFutureSink]], this operator materializes 
the inner sink as soon as the future
+   * completes, even if no elements have arrived yet. This means empty streams 
complete normally rather than failing
+   * with [[NeverMaterializedException]]. At most one element that arrives 
before the future completes is buffered.
+   *
+   * The materialized future value is completed with the materialized value of 
the inner sink once it has been
+   * materialized, or failed if the future itself fails or if materialization 
of the inner sink fails. Upstream
+   * failures or downstream cancellations that occur before the inner sink is 
materialized are propagated through
+   * the inner sink rather than failing the materialized value directly.
+   *
+   * @since 1.5.0
+   */
+  def eagerFutureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
+    Sink.fromGraph(new EagerFutureSink(future))
+
   /**
    * Defers invoking the `create` function to create a sink until there is a 
first element passed from upstream.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to