This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.3.x-fromArray in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 18e94b88dbd8747e250831c3f9a679a1f5f7fec7 Author: He-Pin <[email protected]> AuthorDate: Mon Nov 10 22:08:54 2025 +0800 feat: Add Source#apply for Array --- .../apache/pekko/stream/DslFactoriesConsistencySpec.scala | 1 + .../org/apache/pekko/stream/scaladsl/SourceSpec.scala | 12 ++++++++++++ .../scala/org/apache/pekko/stream/javadsl/Source.scala | 5 ++--- .../scala/org/apache/pekko/stream/scaladsl/Source.scala | 15 +++++++++++++++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala index 93defd9017..29565b52a4 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala @@ -55,6 +55,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers { ("apply" -> "fromGraph") :: ("apply" -> "fromIterator") :: ("apply" -> "fromFunctions") :: + ("apply" -> "fromArray") :: Nil // format: OFF diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 87c20154d2..780760f6a7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -62,6 +62,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout { c.expectComplete() } + "product elements with Array" in { + val p = Source(Array(1, 2, 3)).runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(3) + c.expectNext(1) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + "reject later subscriber" in { val p = Source.single(1).runWith(Sink.asPublisher(false)) val c1 = TestSubscriber.manualProbe[Int]() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index c97b5ad8f0..58f769ab45 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -36,7 +36,7 @@ import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, ZipWithIndexJava } +import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -201,8 +201,7 @@ object Source { * * @since 1.1.0 */ - def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromGraph( - new ArraySource[T](array))) + def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source(array)) /** * Create a `Source` from an `Optional` value, emitting the value if it is present. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 306dde3ec3..20a94c01b9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -423,6 +423,21 @@ object Source { def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource) + /** + * Creates a `Source` from an array, if the array is empty, the stream is completed immediately, + * otherwise, every element of the array will be emitted sequentially. + * + * @since 1.3.0 + */ + def apply[T](array: Array[T]): Source[T, NotUsed] = { + if (array.length == 0) + empty + else if (array.length == 1) + single(array(0)) + else + Source.fromGraph(new ArraySource[T](array)) + } + /** * Starts a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
