This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch fromArray in repository https://gitbox.apache.org/repos/asf/pekko.git
commit b369a5d8ef215b59a4c4247e5cdeb7e78cbb68f4 Author: He-Pin <[email protected]> AuthorDate: Sun Nov 2 17:08:15 2025 +0800 feat: Add fromArray to Source scaladsl --- .../main/paradox/stream/operators/Source/fromArray.md | 2 +- .../scala/org/apache/pekko/stream/javadsl/Source.scala | 5 ++--- .../org/apache/pekko/stream/scaladsl/Source.scala | 18 +++++++++++++++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source/fromArray.md b/docs/src/main/paradox/stream/operators/Source/fromArray.md index 0e0eb34e7a..0d4a7969d1 100644 --- a/docs/src/main/paradox/stream/operators/Source/fromArray.md +++ b/docs/src/main/paradox/stream/operators/Source/fromArray.md @@ -6,7 +6,7 @@ Stream the values of an `array`. ## Signature -@apidoc[Source.from](Source$) { java="#fromArray(java.lang.Object[])" } +@apidoc[Source.fromArray](Source$) { scala="#apply[T](array:scala.Array[T]):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#fromArray(java.lang.Object[])" } ## Description diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 55128fd3a1..412af13d34 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -38,7 +38,7 @@ import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, ZipWithIndexJava } +import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util._ import org.reactivestreams.{ Publisher, Subscriber } @@ -199,8 +199,7 @@ object Source { * * @since 1.1.0 */ - def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromGraph( - new ArraySource[T](array))) + def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromArray(array)) /** * Create a `Source` from an `Optional` value, emitting the value if it is present. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index a50d7bcd8e..e6f587cd2a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -30,7 +30,7 @@ import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ GraphStages, IterableSource, LazyFutureSource, LazySingleSource } +import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource, LazyFutureSource, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun @@ -422,6 +422,22 @@ object Source { def single[T](element: T): Source[T, NotUsed] = fromGraph(new GraphStages.SingleSource(element)) + /** + * Creates a `Source` from an array, if the array is empty, the stream is completed immediately, + * otherwise, every element of the array will be emitted sequentially. + * + * @since 1.3.0 + */ + def fromArray[T](array: Array[T]): Source[T, NotUsed] = { + if (array.length == 0) { + empty + } else if (array.length == 1) { + single(array(0)) + } else { + Source.fromGraph(new ArraySource[T](array)) + } + } + /** * Create a `Source` from an `Option` value, emitting the value if it is defined. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
