This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new a536fe38ed chore: Optimize Source.from iterable. (#2556)
a536fe38ed is described below
commit a536fe38ed9e189362b86d0eabf28726452cdc02
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Dec 7 18:19:58 2025 +0800
chore: Optimize Source.from iterable. (#2556)
---
.../java/org/apache/pekko/stream/javadsl/SourceTest.java | 15 +++++++++++++++
.../pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala | 4 ++--
.../org/apache/pekko/stream/scaladsl/SourceSpec.scala | 9 +++++++++
.../scala/org/apache/pekko/stream/javadsl/Source.scala | 11 ++++++++---
.../scala/org/apache/pekko/stream/scaladsl/Source.scala | 12 +++++++++---
5 files changed, 43 insertions(+), 8 deletions(-)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index f3d21d506a..dda5bb4bb7 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -84,6 +84,21 @@ public class SourceTest extends StreamTest {
Source<Fruit, NotUsed> fruits = appleFruits.merge(orangeFruits);
}
+ @Test
+ public void mustBeAbleToUseFromIterable() throws Exception {
+ final var result =
+ Source.combine(
+ Source.from(Collections.<String>emptyList()),
+ Source.from(Collections.singleton("a")),
+ List.of(Source.from(List.of("b", "c"))),
+ x -> Concat.<String>create(x, false))
+ .toMat(Sink.seq(), Keep.right())
+ .run(system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+ assertEquals(List.of("a", "b", "c"), result);
+ }
+
@Test
public void mustBeAbleToUseSimpleOperators() {
final TestKit probe = new TestKit(system);
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
index 1a8c1fca6d..6f6f25742e 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapPrefixSpec.scala
@@ -664,7 +664,7 @@ class FlowFlatMapPrefixSpec extends
StreamSpec("pekko.loglevel = debug") {
}
"complete when downstream cancels before pulling and upstream does not
produce" in {
- val fSeq = Source(List.empty[Int])
+ val fSeq = Source.lazySource(() => Source(List.empty[Int]))
.flatMapPrefixMat(1) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
@@ -680,7 +680,7 @@ class FlowFlatMapPrefixSpec extends
StreamSpec("pekko.loglevel = debug") {
}
"complete when downstream cancels before pulling and upstream does not
produce, prefix=0" in {
- val fSeq = Source(List.empty[Int])
+ val fSeq = Source.lazySource(() => Source(List.empty[Int]))
.flatMapPrefixMat(0) { prefix =>
Flow[Int].mapMaterializedValue(_ => prefix)
}(Keep.right)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index 780760f6a7..72fd8af97c 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -199,6 +199,15 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
immutable.Seq(1, 2, 3, 10, 20, 30))
}
+ "combine using Concat strategy 3 inputs with simplified API" in {
+ val first = Source(List[String]())
+ val second = Source(List("a"))
+ val others = Source(List("b", "c"))
+ Source.combine(first, second, others)(x => Concat[String](x))
+ .runWith(Sink.seq)
+ .futureValue should ===(immutable.Seq("a", "b", "c"))
+ }
+
"combine from two inputs with combinedMat and take a materialized value"
in {
val queueSource = Source.queue[Int](3)
val intSeqSource = Source(1 to 3)
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 89bef8f91f..f1bb9e7733 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -187,10 +187,15 @@ object Source {
// this adapter is not immutable if the underlying java.lang.Iterable is
modified
// but there is not anything we can do to prevent that from happening.
// ConcurrentModificationException will be thrown in some cases.
- val scalaIterable = new immutable.Iterable[O] {
- override def iterator: Iterator[O] = iterable.iterator().asScala
+ iterable match {
+ case c: java.util.Collection[O] if c.isEmpty => empty()
+ case c: java.util.Collection[O] if c.size() == 1 =>
single(c.iterator().next())
+ case _ =>
+ val scalaIterable = new immutable.Iterable[O] {
+ override def iterator: Iterator[O] = iterable.iterator().asScala
+ }
+ new Source(scaladsl.Source(scalaIterable))
}
- new Source(scaladsl.Source(scalaIterable))
}
/**
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index 96b0c55c0d..c08b7de18a 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl
import java.util.concurrent.CompletionStage
-import scala.annotation.{ tailrec, varargs }
+import scala.annotation.{ switch, tailrec, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{ immutable, AbstractIterator }
import scala.concurrent.{ Future, Promise }
@@ -402,8 +402,14 @@ object Source {
* stream will see an individual flow of elements (always starting from the
* beginning) regardless of when they subscribed.
*/
- def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =
- fromGraph(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+ def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = {
+ (iterable.knownSize: @switch) match {
+ case 0 => empty
+ case 1 => single(iterable.head)
+ case _ =>
+ fromGraph(new
IterableSource[T](iterable)).withAttributes(DefaultAttributes.iterableSource)
+ }
+ }
/**
* Creates a `Source` from an array, if the array is empty, the stream is
completed immediately,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]