This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch collection
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 174433ed48f6f45311aef259e22b90acfafce722
Author: He-Pin <[email protected]>
AuthorDate: Sat Dec 6 20:51:22 2025 +0800

    chore: Optimize Source.from iterable.
---
 .../java/org/apache/pekko/stream/javadsl/SourceTest.java  | 15 +++++++++++++++
 .../org/apache/pekko/stream/scaladsl/SourceSpec.scala     |  9 +++++++++
 .../scala/org/apache/pekko/stream/javadsl/Source.scala    | 11 ++++++++---
 .../scala/org/apache/pekko/stream/scaladsl/Source.scala   | 12 +++++++++---
 4 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index f3d21d506a..dda5bb4bb7 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -84,6 +84,21 @@ public class SourceTest extends StreamTest {
     Source<Fruit, NotUsed> fruits = appleFruits.merge(orangeFruits);
   }
 
+  @Test
+  public void mustBeAbleToUseFromIterable() throws Exception {
+    final var result =
+        Source.combine(
+                Source.from(Collections.<String>emptyList()),
+                Source.from(Collections.singleton("a")),
+                List.of(Source.from(List.of("b", "c"))),
+                x -> Concat.<String>create(x, false))
+            .toMat(Sink.seq(), Keep.right())
+            .run(system)
+            .toCompletableFuture()
+            .get(3, TimeUnit.SECONDS);
+    assertEquals(List.of("a", "b", "c"), result);
+  }
+
   @Test
   public void mustBeAbleToUseSimpleOperators() {
     final TestKit probe = new TestKit(system);
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 780760f6a7..72fd8af97c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -199,6 +199,15 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
         immutable.Seq(1, 2, 3, 10, 20, 30))
     }
 
+    "combine using Concat strategy 3 inputs with simplified API" in {
+      val first = Source(List[String]())
+      val second = Source(List("a"))
+      val others = Source(List("b", "c"))
+      Source.combine(first, second, others)(x => Concat[String](x))
+        .runWith(Sink.seq)
+        .futureValue should ===(immutable.Seq("a", "b", "c"))
+    }
+
     "combine from two inputs with combinedMat and take a materialized value" 
in {
       val queueSource = Source.queue[Int](3)
       val intSeqSource = Source(1 to 3)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 89bef8f91f..f1bb9e7733 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -187,10 +187,15 @@ object Source {
     // this adapter is not immutable if the underlying java.lang.Iterable is 
modified
     // but there is not anything we can do to prevent that from happening.
     // ConcurrentModificationException will be thrown in some cases.
-    val scalaIterable = new immutable.Iterable[O] {
-      override def iterator: Iterator[O] = iterable.iterator().asScala
+    iterable match {
+      case c: java.util.Collection[O] if c.isEmpty     => empty()
+      case c: java.util.Collection[O] if c.size() == 1 => 
single(c.iterator().next())
+      case _                                           =>
+        val scalaIterable = new immutable.Iterable[O] {
+          override def iterator: Iterator[O] = iterable.iterator().asScala
+        }
+        new Source(scaladsl.Source(scalaIterable))
     }
-    new Source(scaladsl.Source(scalaIterable))
   }
 
   /**
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 96b0c55c0d..c08b7de18a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
 
 import java.util.concurrent.CompletionStage
 
-import scala.annotation.{ tailrec, varargs }
+import scala.annotation.{ switch, tailrec, varargs }
 import scala.annotation.unchecked.uncheckedVariance
 import scala.collection.{ immutable, AbstractIterator }
 import scala.concurrent.{ Future, Promise }
@@ -402,8 +402,14 @@ object Source {
    * stream will see an individual flow of elements (always starting from the
    * beginning) regardless of when they subscribed.
    */
-  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
-    fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
+    (iterable.knownSize: @switch) match {
+      case 0 => empty
+      case 1 => single(iterable.head)
+      case _ =>
+        fromGraph(new 
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+    }
+  }
 
   /**
    * Creates a `Source` from an array, if the array is empty, the stream is 
completed immediately,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to