This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.5.x by this push:
     new 07b19b07d6 chore: optimize Source#future and Source#futureSource 
(#2560) (#2583)
07b19b07d6 is described below

commit 07b19b07d699b1dd089fc0095e542ff8c38afdc0
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Dec 15 22:43:28 2025 +0800

    chore: optimize Source#future and Source#futureSource (#2560) (#2583)
    
    * chore: optimize Source#future and Source#futureSource
    
    ---------
    
    Co-authored-by: PJ Fanning <[email protected]>
    (cherry picked from commit 01f2265fb2d69e4e82924680740d1b0e3e212f10)
---
 .../pekko/stream/impl/TraversalBuilderSpec.scala   |  7 +-
 .../apache/pekko/stream/scaladsl/SourceSpec.scala  | 83 +++++++++++++++++++++-
 .../org/apache/pekko/stream/scaladsl/Source.scala  | 18 +++--
 3 files changed, 100 insertions(+), 8 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
index 5bac7f5a3c..a671832502 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
@@ -13,6 +13,8 @@
 
 package org.apache.pekko.stream.impl
 
+import scala.concurrent.Promise
+
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream._
@@ -23,8 +25,6 @@ import pekko.stream.scaladsl.{ Keep, Source }
 import pekko.util.OptionVal
 import pekko.testkit.PekkoSpec
 
-import scala.concurrent.Future
-
 class TraversalBuilderSpec extends PekkoSpec {
 
   "CompositeTraversalBuilder" must {
@@ -508,7 +508,8 @@ class TraversalBuilderSpec extends PekkoSpec {
   }
 
   "find Source.future via TraversalBuilder with getValuePresentedSource" in {
-    val future = Future.successful("a")
+    val promise = Promise[String]()
+    val future = promise.future
     
TraversalBuilder.getValuePresentedSource(Source.future(future)).get.asInstanceOf[FutureSource[
       String]].future should ===(
       future)
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 780760f6a7..c36793220a 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -547,8 +547,49 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
     }
   }
 
-  "Source.futureSource" must {
+  "Source.future" must {
+    "work as empty source when the future source completes with null" in {
+      val source = Source.future(Future.successful(null.asInstanceOf[String]))
+      val probe = source.runWith(TestSink[String]())
+
+      probe.request(1)
+      probe.expectComplete()
+    }
+
+    "work with a successful future" in {
+      val source = Source.future(Future.successful(42))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectNext(42)
+      probe.expectComplete()
+    }
 
+    "work with a failed future" in {
+      val ex = new RuntimeException("boom")
+      val source = Source.future(Future.failed(ex))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectError().getMessage should ===("boom")
+    }
+
+    "work with a delayed future" in {
+      val promise = scala.concurrent.Promise[Int]()
+      val source = Source.future(promise.future)
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectNoMessage(500.millis)
+
+      promise.success(42)
+
+      probe.expectNext(42)
+      probe.expectComplete()
+    }
+  }
+
+  "Source.futureSource" must {
     "not cancel substream twice" in {
       val result = Source
         
.futureSource(pekko.pattern.after(2.seconds)(Future.successful(Source(1 to 2))))
@@ -558,6 +599,46 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
 
       Await.result(result, 4.seconds) shouldBe Done
     }
+
+    "fail when the future completes with null" in {
+      val source = 
Source.futureSource(Future.successful(null.asInstanceOf[Source[Int, NotUsed]]))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectError().getMessage should include("futureSource completed 
with null")
+    }
+
+    "work with a successful future" in {
+      val source = Source.futureSource(Future.successful(Source(1 to 3)))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(3)
+      probe.expectNext(1, 2, 3)
+      probe.expectComplete()
+    }
+
+    "work with a failed future source" in {
+      val ex = new RuntimeException("boom")
+      val source = Source.futureSource(Future.failed(ex))
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(1)
+      probe.expectError().getMessage should ===("boom")
+    }
+
+    "work with a delayed future source" in {
+      val promise = scala.concurrent.Promise[Source[Int, NotUsed]]()
+      val source = Source.futureSource(promise.future)
+      val probe = source.runWith(TestSink[Int]())
+
+      probe.request(3)
+      probe.expectNoMessage(500.millis)
+
+      promise.success(Source(1 to 3))
+
+      probe.expectNext(1, 2, 3)
+      probe.expectComplete()
+    }
   }
 
   "Source of sources" must {
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index c1b512a7ce..b4d9bc18e3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -639,8 +639,12 @@ object Source {
    * Emits a single value when the given `Future` is successfully completed 
and then completes the stream.
    * The stream fails if the `Future` is completed with a failure.
    */
-  def future[T](futureElement: Future[T]): Source[T, NotUsed] =
-    fromGraph(new FutureSource[T](futureElement))
+  def future[T](futureElement: Future[T]): Source[T, NotUsed] = 
futureElement.value match {
+    case None                           => fromGraph(new 
FutureSource[T](futureElement))
+    case Some(scala.util.Success(null)) => empty[T]
+    case Some(scala.util.Success(elem)) => single(elem)
+    case Some(scala.util.Failure(ex))   => failed[T](ex)
+  }
 
   /**
    * Never emits any elements, never completes and never fails.
@@ -662,8 +666,14 @@ object Source {
    * Turn a `Future[Source]` into a source that will emit the values of the 
source when the future completes successfully.
    * If the `Future` is completed with a failure the stream is failed.
    */
-  def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, 
Future[M]] =
-    fromGraph(new FutureFlattenSource(futureSource))
+  def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, 
Future[M]] = futureSource.value match {
+    case None                           => fromGraph(new 
FutureFlattenSource(futureSource))
+    case Some(scala.util.Success(null)) =>
+      val exception = new NullPointerException("futureSource completed with 
null")
+      Source.failed(exception).mapMaterializedValue(_ => 
Future.failed[M](exception))
+    case Some(scala.util.Success(source)) => 
source.mapMaterializedValue(Future.successful)
+    case Some(scala.util.Failure(ex))     => 
Source.failed[T](ex).mapMaterializedValue(_ => Future.failed[M](ex))
+  }
 
   /**
    * Defers invoking the `create` function to create a single element until 
there is downstream demand.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to