This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new a536fe38ed chore: Optimize Source.from iterable. (#2556)
a536fe38ed is described below

commit a536fe38ed9e189362b86d0eabf28726452cdc02
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Dec 7 18:19:58 2025 +0800

    chore: Optimize Source.from iterable. (#2556)
---
 .../java/org/apache/pekko/stream/javadsl/SourceTest.java  | 15 +++++++++++++++
 .../pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala     |  4 ++--
 .../org/apache/pekko/stream/scaladsl/SourceSpec.scala     |  9 +++++++++
 .../scala/org/apache/pekko/stream/javadsl/Source.scala    | 11 ++++++++---
 .../scala/org/apache/pekko/stream/scaladsl/Source.scala   | 12 +++++++++---
 5 files changed, 43 insertions(+), 8 deletions(-)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index f3d21d506a..dda5bb4bb7 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -84,6 +84,21 @@ public class SourceTest extends StreamTest {
     Source<Fruit, NotUsed> fruits = appleFruits.merge(orangeFruits);
   }
 
+  @Test
+  public void mustBeAbleToUseFromIterable() throws Exception {
+    final var result =
+        Source.combine(
+                Source.from(Collections.<String>emptyList()),
+                Source.from(Collections.singleton("a")),
+                List.of(Source.from(List.of("b", "c"))),
+                x -> Concat.<String>create(x, false))
+            .toMat(Sink.seq(), Keep.right())
+            .run(system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals(List.of("a", "b", "c"), result);
+  }
+
   @Test
   public void mustBeAbleToUseSimpleOperators() {
     final TestKit probe = new TestKit(system);
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
index 1a8c1fca6d..6f6f25742e 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
@@ -664,7 +664,7 @@ class FlowFlatMapPrefixSpec extends 
StreamSpec("pekko.loglevel = debug") {
       }
 
       "complete when downstream cancels before pulling and upstream does not 
produce" in {
-        val fSeq = Source(List.empty[Int])
+        val fSeq = Source.lazySource(() => Source(List.empty[Int]))
           .flatMapPrefixMat(1) { prefix =>
             Flow[Int].mapMaterializedValue(_ => prefix)
           }(Keep.right)
@@ -680,7 +680,7 @@ class FlowFlatMapPrefixSpec extends 
StreamSpec("pekko.loglevel = debug") {
       }
 
       "complete when downstream cancels before pulling and upstream does not 
produce, prefix=0" in {
-        val fSeq = Source(List.empty[Int])
+        val fSeq = Source.lazySource(() => Source(List.empty[Int]))
           .flatMapPrefixMat(0) { prefix =>
             Flow[Int].mapMaterializedValue(_ => prefix)
           }(Keep.right)
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 780760f6a7..72fd8af97c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -199,6 +199,15 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         immutable.Seq(1, 2, 3, 10, 20, 30))
     }
 
+    "combine using Concat strategy 3 inputs with simplified API" in {
+      val first = Source(List[String]())
+      val second = Source(List("a"))
+      val others = Source(List("b", "c"))
+      Source.combine(first, second, others)(x => Concat[String](x))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq("a", "b", "c"))
+    }
+
     "combine from two inputs with combinedMat and take a materialized value" 
in {
       val queueSource = Source.queue[Int](3)
       val intSeqSource = Source(1 to 3)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 89bef8f91f..f1bb9e7733 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -187,10 +187,15 @@ object Source {
     // this adapter is not immutable if the underlying java.lang.Iterable is 
modified
     // but there is not anything we can do to prevent that from happening.
     // ConcurrentModificationException will be thrown in some cases.
-    val scalaIterable = new immutable.Iterable[O] {
-      override def iterator: Iterator[O] = iterable.iterator().asScala
+    iterable match {
+      case c: java.util.Collection[O] if c.isEmpty     => empty()
+      case c: java.util.Collection[O] if c.size() == 1 => 
single(c.iterator().next())
+      case _                                           =>
+        val scalaIterable = new immutable.Iterable[O] {
+          override def iterator: Iterator[O] = iterable.iterator().asScala
+        }
+        new Source(scaladsl.Source(scalaIterable))
     }
-    new Source(scaladsl.Source(scalaIterable))
   }
 
   /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 96b0c55c0d..c08b7de18a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.CompletionStage
 
-import scala.annotation.{ tailrec, varargs }
+import scala.annotation.{ switch, tailrec, varargs }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.{ immutable, AbstractIterator }
 import scala.concurrent.{ Future, Promise }
@@ -402,8 +402,14 @@ object Source {
    * stream will see an individual flow of elements (always starting from the
    * beginning) regardless of when they subscribed.
    */
-  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
-    fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
+    (iterable.knownSize: @switch) match {
+      case 0 => empty
+      case 1 => single(iterable.head)
+      case _ =>
+        fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+    }
+  }
 
   /**
    * Creates a `Source` from an array, if the array is empty, the stream is 
completed immediately,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to