This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new eeaec22bd5 !str Eagerly fails flow if the future is already failed.
eeaec22bd5 is described below

commit eeaec22bd516e45db7536db7dc26567918c8dd91
Author: kerr <[email protected]>
AuthorDate: Mon Nov 14 13:12:46 2022 +0800

    !str Eagerly fails flow if the future is already failed.
---
 .../pekko/stream/scaladsl/FlowFromFutureSpec.scala |  8 ++++++++
 .../pekko/stream/impl/fusing/GraphStages.scala     | 22 ++++++++++------------
 2 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
index 48cde853b2..54709e4b07 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
@@ -20,6 +20,7 @@ import scala.concurrent.duration._
 import scala.util.control.NoStackTrace
 
 import org.apache.pekko.stream.testkit._
+import org.apache.pekko.stream.testkit.scaladsl.TestSink
 
 @nowarn("msg=deprecated") // testing deprecated API
 class FlowFromFutureSpec extends StreamSpec {
@@ -42,6 +43,13 @@ class FlowFromFutureSpec extends StreamSpec {
       c.expectSubscriptionAndError(ex)
     }
 
+    "fails flow from already failed Future even no demands" in {
+      val ex = new RuntimeException("test") with NoStackTrace
+      val sub = Source.fromFuture(Future.failed[Int](ex))
+        .runWith(TestSink.probe)
+      sub.expectSubscriptionAndError(ex)
+    }
+
     "produce one element when Future is completed" in {
       val promise = Promise[Int]()
       val c = TestSubscriber.manualProbe[Int]()
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index 036976b238..f3319b0a7e 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -390,27 +390,25 @@ import pekko.stream.stage._
     override def initialAttributes: Attributes = DefaultAttributes.futureSource
     override def createLogic(attr: Attributes) =
       new GraphStageLogic(shape) with OutHandler {
-        def onPull(): Unit = {
+        override def preStart(): Unit = {
           future.value match {
             case Some(completed) =>
               // optimization if the future is already completed
-              onFutureCompleted(completed)
+              handle(completed)
             case None =>
-              val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _
+              val cb = getAsyncCallback[Try[T]](handle).invoke _
               future.onComplete(cb)(ExecutionContexts.parasitic)
           }
+        }
 
-          def onFutureCompleted(result: Try[T]): Unit = {
-            result match {
-              case scala.util.Success(null) => completeStage()
-              case scala.util.Success(v)    => emit(out, v, () => 
completeStage())
-              case scala.util.Failure(t)    => failStage(t)
-            }
-          }
-
-          setHandler(out, eagerTerminateOutput) // After first pull we won't 
produce anything more
+        private def handle(result: Try[T]): Unit = result match {
+          case scala.util.Success(null) => completeStage()
+          case scala.util.Success(v)    => emit(out, v, () => completeStage())
+          case scala.util.Failure(t)    => failStage(t)
         }
 
+        def onPull(): Unit = ()
+
         setHandler(out, this)
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to