This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.3.x-items in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 047dd0ba2e6ef115617b0e1be47b03694bf13d99 Author: He-Pin(kerr) <[email protected]> AuthorDate: Mon Nov 10 21:47:40 2025 +0800 feat: Add Source#items (#2429) (cherry picked from commit f0db8f0c3344a74bd41d1c127540bf5d28f56bd1) --- .../main/paradox/stream/operators/Source/items.md | 26 ++++++++++++++++++++++ docs/src/main/paradox/stream/operators/index.md | 2 ++ .../apache/pekko/stream/javadsl/SourceTest.java | 21 +++++++++++++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 18 +++++++++++++++ .../org/apache/pekko/stream/scaladsl/Source.scala | 20 ++++++++++++++++- 5 files changed, 86 insertions(+), 1 deletion(-) diff --git a/docs/src/main/paradox/stream/operators/Source/items.md b/docs/src/main/paradox/stream/operators/Source/items.md new file mode 100644 index 0000000000..31cf9a0415 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source/items.md @@ -0,0 +1,26 @@ +# Source.items + +Create a `Source` from the given items. + +@ref[Source operators](../index.md#source-operators) + +## Signature + +@apidoc[Source.items](Source$) { scala="#items[T](items:T*):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]" java="#items[T](T)" } + + +## Description + +Create a `Source` from the given items. + +## Examples + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the items one by one + +**completes** when the last item has been emitted + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 80ae73ef44..8bbf89a16f 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -27,6 +27,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad |Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).| |Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.| |Source|<a name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.| +|Source|<a name="items"></a>@ref[items](Source/items.md)|Create a `Source` from the given items.| |Source|<a name="iterate"></a>@ref[iterate](Source/iterate.md)|Creates a sequential `Source` by iterating with the given predicate, function and seed.| |Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Deprecated by @ref[`Source.lazySource`](Source/lazySource.md).| |Source|<a name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).| @@ -533,6 +534,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [interleave](Source-or-Flow/interleave.md) * [interleaveAll](Source-or-Flow/interleaveAll.md) * [intersperse](Source-or-Flow/intersperse.md) +* [items](Source/items.md) * [iterate](Source/iterate.md) * [javaCollector](StreamConverters/javaCollector.md) * [javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 9ed206b3c3..bc4d67db7c 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -125,6 +125,27 @@ public class SourceTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToUseItems() { + Source.items("a", "b", "c") + .runWith(TestSink.create(system), system) + .ensureSubscription() + .request(3) + .expectNext("a") + .expectNext("b") + .expectNext("c") + .expectComplete(); + } + + @Test + public void mustBeAbleToUseItemsWhenEmpty() { + Source.<String>items() + .runWith(TestSink.create(system), system) + .ensureSubscription() + .request(1) + .expectComplete(); + } + @Test public void mustBeAbleToUseVoidTypeInForeach() { final TestKit probe = new TestKit(system); diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index c97b5ad8f0..7135becfb1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -302,6 +302,24 @@ object Source { def single[T](element: T): Source[T, NotUsed] = new Source(scaladsl.Source.single(element)) + /** + * Create a `Source` from the given elements. + * + * @since 1.3.0 + */ + @varargs + @SafeVarargs + @SuppressWarnings(Array("varargs")) + def items[T](items: T*): javadsl.Source[T, NotUsed] = { + if (items.isEmpty) { + empty() + } else if (items.length == 1) { + single(items.head) + } else { + new Source(scaladsl.Source(items.toIndexedSeq)) + } + } + /** * Create a `Source` that will continually emit the given element. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 306dde3ec3..255b38b86a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.CompletionStage -import scala.annotation.{ nowarn, tailrec } +import scala.annotation.{ nowarn, tailrec, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.{ immutable, AbstractIterator } import scala.concurrent.{ Future, Promise } @@ -480,6 +480,24 @@ object Source { def single[T](element: T): Source[T, NotUsed] = fromGraph(new GraphStages.SingleSource(element)) + /** + * Create a `Source` from the given elements. + * + * @since 1.3.0 + */ + @varargs + @SafeVarargs + @SuppressWarnings(Array("varargs")) + def items[T](items: T*): Source[T, NotUsed] = { + if (items.isEmpty) { + empty[T] + } else if (items.length == 1) { + single(items.head) + } else { + Source(items.toIndexedSeq) + } + } + /** * Create a `Source` from an `Option` value, emitting the value if it is defined. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
