This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new 09e370860a feat: Add Source#apply for Array (#2474)
09e370860a is described below

commit 09e370860a9b006ceda9b181e926d9e52d529da8
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Nov 11 01:01:17 2025 +0800

    feat: Add Source#apply for Array (#2474)
---
 .../pekko/stream/DslFactoriesConsistencySpec.scala      |  1 +
 .../org/apache/pekko/stream/scaladsl/SourceSpec.scala   | 12 ++++++++++++
 .../scala/org/apache/pekko/stream/javadsl/Source.scala  |  5 ++---
 .../scala/org/apache/pekko/stream/scaladsl/Source.scala | 17 ++++++++++++++++-
 4 files changed, 31 insertions(+), 4 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 93defd9017..29565b52a4 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -55,6 +55,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with 
Matchers {
     ("apply" -> "fromGraph") ::
     ("apply" -> "fromIterator") ::
     ("apply" -> "fromFunctions") ::
+    ("apply" -> "fromArray") ::
     Nil
 
   // format: OFF
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 87c20154d2..780760f6a7 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -62,6 +62,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       c.expectComplete()
     }
 
+    "product elements with Array" in {
+      val p = Source(Array(1, 2, 3)).runWith(Sink.asPublisher(false))
+      val c = TestSubscriber.manualProbe[Int]()
+      p.subscribe(c)
+      val sub = c.expectSubscription()
+      sub.request(3)
+      c.expectNext(1)
+      c.expectNext(2)
+      c.expectNext(3)
+      c.expectComplete()
+    }
+
     "reject later subscriber" in {
       val p = Source.single(1).runWith(Sink.asPublisher(false))
       val c1 = TestSubscriber.manualProbe[Int]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 7135becfb1..5cc5401219 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -36,7 +36,7 @@ import pekko.japi.function.Creator
 import pekko.stream._
 import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava 
}
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, 
ZipWithIndexJava }
+import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.{ unused, _ }
 import pekko.util.FutureConverters._
 import pekko.util.JavaDurationConverters._
@@ -201,8 +201,7 @@ object Source {
    *
    * @since 1.1.0
    */
-  def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new 
Source(scaladsl.Source.fromGraph(
-    new ArraySource[T](array)))
+  def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new 
Source(scaladsl.Source(array))
 
   /**
    * Create a `Source` from an `Optional` value, emitting the value if it is 
present.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 255b38b86a..c1b512a7ce 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -29,7 +29,7 @@ import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.impl._
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ GraphStages, IterableSource, 
LazyFutureSource, LazySingleSource }
+import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource, 
LazyFutureSource, LazySingleSource }
 import pekko.stream.impl.fusing.GraphStages._
 import pekko.stream.stage.GraphStageWithMaterializedValue
 import pekko.util.ConstantFun
@@ -423,6 +423,21 @@ object Source {
   def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
     fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
 
+  /**
+   * Creates a `Source` from an array, if the array is empty, the stream is 
completed immediately,
+   * otherwise, every element of the array will be emitted sequentially.
+   *
+   * @since 1.3.0
+   */
+  def apply[T](array: Array[T]): Source[T, NotUsed] = {
+    if (array.length == 0)
+      empty
+    else if (array.length == 1)
+      single(array(0))
+    else
+      Source.fromGraph(new ArraySource[T](array))
+  }
+
   /**
    * Starts a new `Source` from the given `Future`. The stream will consist of
    * one element when the `Future` is completed with a successful value, which


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to