This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new eeaec22bd5 !str Eagerly fails flow if the future is already failed.
eeaec22bd5 is described below
commit eeaec22bd516e45db7536db7dc26567918c8dd91
Author: kerr <[email protected]>
AuthorDate: Mon Nov 14 13:12:46 2022 +0800
!str Eagerly fails flow if the future is already failed.
---
.../pekko/stream/scaladsl/FlowFromFutureSpec.scala | 8 ++++++++
.../pekko/stream/impl/fusing/GraphStages.scala | 22 ++++++++++------------
2 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
index 48cde853b2..54709e4b07 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFromFutureSpec.scala
@@ -20,6 +20,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import org.apache.pekko.stream.testkit._
+import org.apache.pekko.stream.testkit.scaladsl.TestSink
@nowarn("msg=deprecated") // testing deprecated API
class FlowFromFutureSpec extends StreamSpec {
@@ -42,6 +43,13 @@ class FlowFromFutureSpec extends StreamSpec {
c.expectSubscriptionAndError(ex)
}
+ "fails flow from already failed Future even no demands" in {
+ val ex = new RuntimeException("test") with NoStackTrace
+ val sub = Source.fromFuture(Future.failed[Int](ex))
+ .runWith(TestSink.probe)
+ sub.expectSubscriptionAndError(ex)
+ }
+
"produce one element when Future is completed" in {
val promise = Promise[Int]()
val c = TestSubscriber.manualProbe[Int]()
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
index 036976b238..f3319b0a7e 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala
@@ -390,27 +390,25 @@ import pekko.stream.stage._
override def initialAttributes: Attributes = DefaultAttributes.futureSource
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with OutHandler {
- def onPull(): Unit = {
+ override def preStart(): Unit = {
future.value match {
case Some(completed) =>
// optimization if the future is already completed
- onFutureCompleted(completed)
+ handle(completed)
case None =>
- val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _
+ val cb = getAsyncCallback[Try[T]](handle).invoke _
future.onComplete(cb)(ExecutionContexts.parasitic)
}
+ }
- def onFutureCompleted(result: Try[T]): Unit = {
- result match {
- case scala.util.Success(null) => completeStage()
- case scala.util.Success(v) => emit(out, v, () =>
completeStage())
- case scala.util.Failure(t) => failStage(t)
- }
- }
-
- setHandler(out, eagerTerminateOutput) // After first pull we won't
produce anything more
+ private def handle(result: Try[T]): Unit = result match {
+ case scala.util.Success(null) => completeStage()
+ case scala.util.Success(v) => emit(out, v, () => completeStage())
+ case scala.util.Failure(t) => failStage(t)
}
+ def onPull(): Unit = ()
+
setHandler(out, this)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]