This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.5.x-alreadyCompleted in repository https://gitbox.apache.org/repos/asf/pekko.git
commit f5eda5d3ee3df27322e2fcccda53d5d3c75d0aa4 Author: He-Pin(kerr) <[email protected]> AuthorDate: Sun Dec 14 19:13:03 2025 +0800 chore: optimize Source#future and Source#futureSource (#2560) * chore: optimize Source#future and Source#futureSource --------- Co-authored-by: PJ Fanning <[email protected]> (cherry picked from commit 01f2265fb2d69e4e82924680740d1b0e3e212f10) # Conflicts: # stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala --- .../pekko/stream/impl/TraversalBuilderSpec.scala | 7 +- .../apache/pekko/stream/scaladsl/SourceSpec.scala | 83 +++++++++++++++++++++- .../org/apache/pekko/stream/scaladsl/Source.scala | 18 +++-- 3 files changed, 100 insertions(+), 8 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala index 5bac7f5a3c..a671832502 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala @@ -13,6 +13,8 @@ package org.apache.pekko.stream.impl +import scala.concurrent.Promise + import org.apache.pekko import pekko.NotUsed import pekko.stream._ @@ -23,8 +25,6 @@ import pekko.stream.scaladsl.{ Keep, Source } import pekko.util.OptionVal import pekko.testkit.PekkoSpec -import scala.concurrent.Future - class TraversalBuilderSpec extends PekkoSpec { "CompositeTraversalBuilder" must { @@ -508,7 +508,8 @@ class TraversalBuilderSpec extends PekkoSpec { } "find Source.future via TraversalBuilder with getValuePresentedSource" in { - val future = Future.successful("a") + val promise = Promise[String]() + val future = promise.future TraversalBuilder.getValuePresentedSource(Source.future(future)).get.asInstanceOf[FutureSource[ String]].future should ===( future) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 780760f6a7..c36793220a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -547,8 +547,49 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } } - "Source.futureSource" must { + "Source.future" must { + "work as empty source when the future source completes with null" in { + val source = Source.future(Future.successful(null.asInstanceOf[String])) + val probe = source.runWith(TestSink[String]()) + + probe.request(1) + probe.expectComplete() + } + + "work with a successful future" in { + val source = Source.future(Future.successful(42)) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectNext(42) + probe.expectComplete() + } + "work with a failed future" in { + val ex = new RuntimeException("boom") + val source = Source.future(Future.failed(ex)) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectError().getMessage should ===("boom") + } + + "work with a delayed future" in { + val promise = scala.concurrent.Promise[Int]() + val source = Source.future(promise.future) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectNoMessage(500.millis) + + promise.success(42) + + probe.expectNext(42) + probe.expectComplete() + } + } + + "Source.futureSource" must { "not cancel substream twice" in { val result = Source .futureSource(pekko.pattern.after(2.seconds)(Future.successful(Source(1 to 2)))) @@ -558,6 +599,46 @@ class SourceSpec extends StreamSpec with DefaultTimeout { Await.result(result, 4.seconds) shouldBe Done } + + "fail when the future completes with null" in { + val source = Source.futureSource(Future.successful(null.asInstanceOf[Source[Int, NotUsed]])) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectError().getMessage should include("futureSource completed with null") + } + + "work with a successful future" in { + val source = Source.futureSource(Future.successful(Source(1 to 3))) + val probe = source.runWith(TestSink[Int]()) + + probe.request(3) + probe.expectNext(1, 2, 3) + probe.expectComplete() + } + + "work with a failed future source" in { + val ex = new RuntimeException("boom") + val source = Source.futureSource(Future.failed(ex)) + val probe = source.runWith(TestSink[Int]()) + + probe.request(1) + probe.expectError().getMessage should ===("boom") + } + + "work with a delayed future source" in { + val promise = scala.concurrent.Promise[Source[Int, NotUsed]]() + val source = Source.futureSource(promise.future) + val probe = source.runWith(TestSink[Int]()) + + probe.request(3) + probe.expectNoMessage(500.millis) + + promise.success(Source(1 to 3)) + + probe.expectNext(1, 2, 3) + probe.expectComplete() + } } "Source of sources" must { diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index c1b512a7ce..b4d9bc18e3 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -639,8 +639,12 @@ object Source { * Emits a single value when the given `Future` is successfully completed and then completes the stream. * The stream fails if the `Future` is completed with a failure. */ - def future[T](futureElement: Future[T]): Source[T, NotUsed] = - fromGraph(new FutureSource[T](futureElement)) + def future[T](futureElement: Future[T]): Source[T, NotUsed] = futureElement.value match { + case None => fromGraph(new FutureSource[T](futureElement)) + case Some(scala.util.Success(null)) => empty[T] + case Some(scala.util.Success(elem)) => single(elem) + case Some(scala.util.Failure(ex)) => failed[T](ex) + } /** * Never emits any elements, never completes and never fails. @@ -662,8 +666,14 @@ object Source { * Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully. * If the `Future` is completed with a failure the stream is failed. */ - def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = - fromGraph(new FutureFlattenSource(futureSource)) + def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = futureSource.value match { + case None => fromGraph(new FutureFlattenSource(futureSource)) + case Some(scala.util.Success(null)) => + val exception = new NullPointerException("futureSource completed with null") + Source.failed(exception).mapMaterializedValue(_ => Future.failed[M](exception)) + case Some(scala.util.Success(source)) => source.mapMaterializedValue(Future.successful) + case Some(scala.util.Failure(ex)) => Source.failed[T](ex).mapMaterializedValue(_ => Future.failed[M](ex)) + } /** * Defers invoking the `create` function to create a single element until there is downstream demand. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
