This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.3.x-fromArray in repository https://gitbox.apache.org/repos/asf/pekko.git
commit bedc467d60c9cc6faf42171e3dac1aa40480cee8 Author: He-Pin(kerr) <[email protected]> AuthorDate: Mon Nov 3 04:44:03 2025 +0800 feat: Add fromArray to Source scaladsl (#2424) (cherry picked from commit 2c2daa88a10a4858a77311928cd405690a3fd50b) --- .../main/paradox/stream/operators/Source/fromArray.md | 2 +- .../scala/org/apache/pekko/stream/javadsl/Source.scala | 6 +++--- .../org/apache/pekko/stream/scaladsl/Source.scala | 18 +++++++++++++++++- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source/fromArray.md b/docs/src/main/paradox/stream/operators/Source/fromArray.md index 0e0eb34e7a..0d4a7969d1 100644 --- a/docs/src/main/paradox/stream/operators/Source/fromArray.md +++ b/docs/src/main/paradox/stream/operators/Source/fromArray.md @@ -6,7 +6,7 @@ Stream the values of an `array`. ## Signature -@apidoc[Source.from](Source$) { java="#fromArray(java.lang.Object[])" } +@apidoc[Source.fromArray](Source$) { scala="#apply[T](array:scala.Array[T]):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#fromArray(java.lang.Object[])" } ## Description diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index c860816154..d4a2a0e022 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -35,13 +35,14 @@ import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } -import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat, ZipWithIndexJava } +import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ import pekko.util.ccompat._ + import org.reactivestreams.{ Publisher, Subscriber } /** Java API */ @@ -200,8 +201,7 @@ object Source { * * @since 1.1.0 */ - def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromGraph( - new ArraySource[T](array))) + def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromArray(array)) /** * Create a `Source` from an `Optional` value, emitting the value if it is present. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 306dde3ec3..e6590cc88a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -29,7 +29,7 @@ import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ GraphStages, IterableSource, LazyFutureSource, LazySingleSource } +import pekko.stream.impl.fusing.{ ArraySource, GraphStages, IterableSource, LazyFutureSource, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ import pekko.stream.stage.GraphStageWithMaterializedValue import pekko.util.ConstantFun @@ -480,6 +480,22 @@ object Source { def single[T](element: T): Source[T, NotUsed] = fromGraph(new GraphStages.SingleSource(element)) + /** + * Creates a `Source` from an array, if the array is empty, the stream is completed immediately, + * otherwise, every element of the array will be emitted sequentially. + * + * @since 1.3.0 + */ + def fromArray[T](array: Array[T]): Source[T, NotUsed] = { + if (array.length == 0) { + empty + } else if (array.length == 1) { + single(array(0)) + } else { + Source.fromGraph(new ArraySource[T](array)) + } + } + /** * Create a `Source` from an `Option` value, emitting the value if it is defined. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
