This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch alreadyCompletedFuture
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 2cc7584dcec62271b1e33c1f0a8ed7a67fb17547
Author: He-Pin <[email protected]>
AuthorDate: Sun Dec 7 18:25:46 2025 +0800

    chore: optimize Source#future and Source#futureSource
---
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 83 +++++++++++++++++++++-
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 18 +++--
 2 files changed, 96 insertions(+), 5 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 780760f6a7..c36793220a 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -547,8 +547,49 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
     }
   }
 
-  "Source.futureSource" must {
+  "Source.future" must {
+    "work as empty source when the future source completes with null" in {
+      val source = Source.future(Future.successful(null.asInstanceOf[String]))
+      val probe = source.runWith(TestSink[String]())
+
+      probe.request(1)
+      probe.expectComplete()
+    }
+
+    "work with a successful future" in {
+      val source = Source.future(Future.successful(42))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectNext(42)
+      probe.expectComplete()
+    }
 
+    "work with a failed future" in {
+      val ex = new RuntimeException("boom")
+      val source = Source.future(Future.failed(ex))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectError().getMessage should ===("boom")
+    }
+
+    "work with a delayed future" in {
+      val promise = scala.concurrent.Promise[Int]()
+      val source = Source.future(promise.future)
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectNoMessage(500.millis)
+
+      promise.success(42)
+
+      probe.expectNext(42)
+      probe.expectComplete()
+    }
+  }
+
+  "Source.futureSource" must {
     "not cancel substream twice" in {
       val result = Source
         
.futureSource(pekko.pattern.after(2.seconds)(Future.successful(Source(1 to 2))))
@@ -558,6 +599,46 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
 
       Await.result(result, 4.seconds) shouldBe Done
     }
+
+    "fail when the future completes with null" in {
+      val source = 
Source.futureSource(Future.successful(null.asInstanceOf[Source[Int, NotUsed]]))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectError().getMessage should include("futureSource completed 
with null")
+    }
+
+    "work with a successful future" in {
+      val source = Source.futureSource(Future.successful(Source(1 to 3)))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(3)
+      probe.expectNext(1, 2, 3)
+      probe.expectComplete()
+    }
+
+    "work with a failed future source" in {
+      val ex = new RuntimeException("boom")
+      val source = Source.futureSource(Future.failed(ex))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectError().getMessage should ===("boom")
+    }
+
+    "work with a delayed future source" in {
+      val promise = scala.concurrent.Promise[Source[Int, NotUsed]]()
+      val source = Source.futureSource(promise.future)
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(3)
+      probe.expectNoMessage(500.millis)
+
+      promise.success(Source(1 to 3))
+
+      probe.expectNext(1, 2, 3)
+      probe.expectComplete()
+    }
   }
 
   "Source of sources" must {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 96b0c55c0d..ab456b704e 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -561,8 +561,12 @@ object Source {
    * Emits a single value when the given `Future` is successfully completed 
and then completes the stream.
    * The stream fails if the `Future` is completed with a failure.
    */
-  def future[T](futureElement: Future[T]): Source[T, NotUsed] =
-    fromGraph(new FutureSource[T](futureElement))
+  def future[T](futureElement: Future[T]): Source[T, NotUsed] = 
futureElement.value match {
+    case Some(scala.util.Success(null)) => empty[T]
+    case Some(scala.util.Success(elem)) => single(elem)
+    case Some(scala.util.Failure(ex))   => failed[T](ex)
+    case _                              => fromGraph(new 
FutureSource[T](futureElement))
+  }
 
   /**
    * Never emits any elements, never completes and never fails.
@@ -584,8 +588,14 @@ object Source {
    * Turn a `Future[Source]` into a source that will emit the values of the 
source when the future completes successfully.
    * If the `Future` is completed with a failure the stream is failed.
    */
-  def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, 
Future[M]] =
-    fromGraph(new FutureFlattenSource(futureSource))
+  def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, 
Future[M]] = futureSource.value match {
+    case Some(scala.util.Success(null)) =>
+      val exception = new NullPointerException("futureSource completed with 
null")
+      Source.failed(exception).mapMaterializedValue(_ => 
Future.failed[M](exception))
+    case Some(scala.util.Success(source)) => 
source.mapMaterializedValue(Future.successful)
+    case Some(scala.util.Failure(ex))     => 
Source.failed[T](ex).mapMaterializedValue(_ => Future.failed[M](ex))
+    case _                                => fromGraph(new 
FutureFlattenSource(futureSource))
+  }
 
   /**
    * Defers invoking the `create` function to create a single element until 
there is downstream demand.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to