This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch collection in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 174433ed48f6f45311aef259e22b90acfafce722 Author: He-Pin <[email protected]> AuthorDate: Sat Dec 6 20:51:22 2025 +0800 chore: Optimize Source.from iterable. --- .../java/org/apache/pekko/stream/javadsl/SourceTest.java | 15 +++++++++++++++ .../org/apache/pekko/stream/scaladsl/SourceSpec.scala | 9 +++++++++ .../scala/org/apache/pekko/stream/javadsl/Source.scala | 11 ++++++++--- .../scala/org/apache/pekko/stream/scaladsl/Source.scala | 12 +++++++++--- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index f3d21d506a..dda5bb4bb7 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -84,6 +84,21 @@ public class SourceTest extends StreamTest { Source<Fruit, NotUsed> fruits = appleFruits.merge(orangeFruits); } + @Test + public void mustBeAbleToUseFromIterable() throws Exception { + final var result = + Source.combine( + Source.from(Collections.<String>emptyList()), + Source.from(Collections.singleton("a")), + List.of(Source.from(List.of("b", "c"))), + x -> Concat.<String>create(x, false)) + .toMat(Sink.seq(), Keep.right()) + .run(system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + assertEquals(List.of("a", "b", "c"), result); + } + @Test public void mustBeAbleToUseSimpleOperators() { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 780760f6a7..72fd8af97c 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -199,6 +199,15 @@ class SourceSpec extends StreamSpec with DefaultTimeout { immutable.Seq(1, 2, 3, 10, 20, 30)) } + "combine using Concat strategy 3 inputs with simplified API" in { + val first = Source(List[String]()) + val second = Source(List("a")) + val others = Source(List("b", "c")) + Source.combine(first, second, others)(x => Concat[String](x)) + .runWith(Sink.seq) + .futureValue should ===(immutable.Seq("a", "b", "c")) + } + "combine from two inputs with combinedMat and take a materialized value" in { val queueSource = Source.queue[Int](3) val intSeqSource = Source(1 to 3) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 89bef8f91f..f1bb9e7733 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -187,10 +187,15 @@ object Source { // this adapter is not immutable if the underlying java.lang.Iterable is modified // but there is not anything we can do to prevent that from happening. // ConcurrentModificationException will be thrown in some cases. - val scalaIterable = new immutable.Iterable[O] { - override def iterator: Iterator[O] = iterable.iterator().asScala + iterable match { + case c: java.util.Collection[O] if c.isEmpty => empty() + case c: java.util.Collection[O] if c.size() == 1 => single(c.iterator().next()) + case _ => + val scalaIterable = new immutable.Iterable[O] { + override def iterator: Iterator[O] = iterable.iterator().asScala + } + new Source(scaladsl.Source(scalaIterable)) } - new Source(scaladsl.Source(scalaIterable)) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 96b0c55c0d..c08b7de18a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.CompletionStage -import scala.annotation.{ tailrec, varargs } +import scala.annotation.{ switch, tailrec, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.{ immutable, AbstractIterator } import scala.concurrent.{ Future, Promise } @@ -402,8 +402,14 @@ object Source { * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ - def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = - fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource) + def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = { + (iterable.knownSize: @switch) match { + case 0 => empty + case 1 => single(iterable.head) + case _ => + fromGraph(new IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource) + } + } /** * Creates a `Source` from an array, if the array is empty, the stream is completed immediately, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
