This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x-fromArray
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 18e94b88dbd8747e250831c3f9a679a1f5f7fec7
Author: He-Pin <[email protected]>
AuthorDate: Mon Nov 10 22:08:54 2025 +0800

    feat: Add Source#apply for Array
---
 .../apache/pekko/stream/DslFactoriesConsistencySpec.scala |  1 +
 .../org/apache/pekko/stream/scaladsl/SourceSpec.scala     | 12 ++++++++++++
 .../scala/org/apache/pekko/stream/javadsl/Source.scala    |  5 ++---
 .../scala/org/apache/pekko/stream/scaladsl/Source.scala   | 15 +++++++++++++++
 4 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 93defd9017..29565b52a4 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -55,6 +55,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with 
Matchers {
     ("apply" -> "fromGraph") ::
     ("apply" -> "fromIterator") ::
     ("apply" -> "fromFunctions") ::
+    ("apply" -> "fromArray") ::
     Nil
 
   // format: OFF
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 87c20154d2..780760f6a7 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -62,6 +62,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
       c.expectComplete()
     }
 
+    "product elements with Array" in {
+      val p = Source(Array(1, 2, 3)).runWith(Sink.asPublisher(false))
+      val c = TestSubscriber.manualProbe[Int]()
+      p.subscribe(c)
+      val sub = c.expectSubscription()
+      sub.request(3)
+      c.expectNext(1)
+      c.expectNext(2)
+      c.expectNext(3)
+      c.expectComplete()
+    }
+
     "reject later subscriber" in {
       val p = Source.single(1).runWith(Sink.asPublisher(false))
       val c1 = TestSubscriber.manualProbe[Int]()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index c97b5ad8f0..58f769ab45 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -36,7 +36,7 @@ import pekko.japi.function.Creator
 import pekko.stream._
 import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava 
}
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, 
ZipWithIndexJava }
+import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
 import pekko.util.{ unused, _ }
 import pekko.util.FutureConverters._
 import pekko.util.JavaDurationConverters._
@@ -201,8 +201,7 @@ object Source {
    *
    * @since 1.1.0
    */
-  def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new 
Source(scaladsl.Source.fromGraph(
-    new ArraySource[T](array)))
+  def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new 
Source(scaladsl.Source(array))
 
   /**
    * Create a `Source` from an `Optional` value, emitting the value if it is 
present.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 306dde3ec3..20a94c01b9 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -423,6 +423,21 @@ object Source {
   def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
     fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
 
+  /**
+   * Creates a `Source` from an array, if the array is empty, the stream is 
completed immediately,
+   * otherwise, every element of the array will be emitted sequentially.
+   *
+   * @since 1.3.0
+   */
+  def apply[T](array: Array[T]): Source[T, NotUsed] = {
+    if (array.length == 0)
+      empty
+    else if (array.length == 1)
+      single(array(0))
+    else
+      Source.fromGraph(new ArraySource[T](array))
+  }
+
   /**
    * Starts a new `Source` from the given `Future`. The stream will consist of
    * one element when the `Future` is completed with a successful value, which


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to