This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new fc45e50b4e feat: Add TraversalBuilder.getValuePresentedSource method 
for further optimization. (#1701)
fc45e50b4e is described below

commit fc45e50b4e3da0cdc64e50ff19da540003e26665
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Feb 17 10:30:02 2025 +0800

    feat: Add TraversalBuilder.getValuePresentedSource method for further 
optimization. (#1701)
---
 .../pekko/stream/impl/TraversalBuilderSpec.scala   | 96 ++++++++++++++++++++++
 .../stream/scaladsl/FlowFlattenMergeSpec.scala     | 14 ----
 .../apache/pekko/stream/impl/FailedSource.scala    |  2 +-
 .../pekko/stream/impl/JavaStreamSource.scala       |  2 +-
 .../pekko/stream/impl/TraversalBuilder.scala       | 44 +++++++++-
 5 files changed, 140 insertions(+), 18 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
index 063b9ec335..5bac7f5a3c 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/TraversalBuilderSpec.scala
@@ -17,9 +17,14 @@ import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream._
 import pekko.stream.impl.TraversalTestUtils._
+import pekko.stream.impl.fusing.IterableSource
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
 import pekko.stream.scaladsl.{ Keep, Source }
+import pekko.util.OptionVal
 import pekko.testkit.PekkoSpec
 
+import scala.concurrent.Future
+
 class TraversalBuilderSpec extends PekkoSpec {
 
   "CompositeTraversalBuilder" must {
@@ -463,4 +468,95 @@ class TraversalBuilderSpec extends PekkoSpec {
     TraversalBuilder.isEmptySource(emptySource) should be(true)
   }
 
+  "find Source.single via TraversalBuilder" in {
+    TraversalBuilder.getSingleSource(Source.single("a")).get.elem should 
===("a")
+    TraversalBuilder.getSingleSource(Source(List("a", "b"))) should 
be(OptionVal.None)
+
+    val singleSourceA = new SingleSource("a")
+    TraversalBuilder.getSingleSource(singleSourceA) should 
be(OptionVal.Some(singleSourceA))
+
+    TraversalBuilder.getSingleSource(Source.single("c").async) should 
be(OptionVal.None)
+    TraversalBuilder.getSingleSource(Source.single("d").mapMaterializedValue(_ 
=> "Mat")) should be(OptionVal.None)
+  }
+
+  "find Source.single via TraversalBuilder with getValuePresentedSource" in {
+    
TraversalBuilder.getValuePresentedSource(Source.single("a")).get.asInstanceOf[SingleSource[String]].elem
 should ===(
+      "a")
+    val singleSourceA = new SingleSource("a")
+    TraversalBuilder.getValuePresentedSource(singleSourceA) should 
be(OptionVal.Some(singleSourceA))
+
+    TraversalBuilder.getValuePresentedSource(Source.single("c").async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.single("d").mapMaterializedValue(_
 => "Mat")) should be(
+      OptionVal.None)
+  }
+
+  "find Source.empty via TraversalBuilder with getValuePresentedSource" in {
+    val emptySource = EmptySource
+    TraversalBuilder.getValuePresentedSource(emptySource) should 
be(OptionVal.Some(emptySource))
+
+    TraversalBuilder.getValuePresentedSource(Source.empty.async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.empty.mapMaterializedValue(_ => 
"Mat")) should be(OptionVal.None)
+  }
+
+  "find javadsl Source.empty via TraversalBuilder with 
getValuePresentedSource" in {
+    import pekko.stream.javadsl.Source
+    val emptySource = Source.empty()
+    TraversalBuilder.getValuePresentedSource(Source.empty()) should 
be(OptionVal.Some(emptySource))
+
+    TraversalBuilder.getValuePresentedSource(Source.empty().async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.empty().mapMaterializedValue(_ 
=> "Mat")) should be(OptionVal.None)
+  }
+
+  "find Source.future via TraversalBuilder with getValuePresentedSource" in {
+    val future = Future.successful("a")
+    
TraversalBuilder.getValuePresentedSource(Source.future(future)).get.asInstanceOf[FutureSource[
+      String]].future should ===(
+      future)
+    val futureSourceA = new FutureSource(future)
+    TraversalBuilder.getValuePresentedSource(futureSourceA) should 
be(OptionVal.Some(futureSourceA))
+
+    TraversalBuilder.getValuePresentedSource(Source.future(future).async) 
should be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.future(future).mapMaterializedValue(_
 => "Mat")) should be(
+      OptionVal.None)
+  }
+
+  "find Source.iterable via TraversalBuilder with getValuePresentedSource" in {
+    val iterable = List("a")
+    
TraversalBuilder.getValuePresentedSource(Source(iterable)).get.asInstanceOf[IterableSource[
+      String]].elements should ===(
+      iterable)
+    val iterableSource = new IterableSource(iterable)
+    TraversalBuilder.getValuePresentedSource(iterableSource) should 
be(OptionVal.Some(iterableSource))
+
+    TraversalBuilder.getValuePresentedSource(Source(iterable).async) should 
be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source(iterable).mapMaterializedValue(_
 => "Mat")) should be(
+      OptionVal.None)
+  }
+
+  "find Source.javaStreamSource via TraversalBuilder with 
getValuePresentedSource" in {
+    val javaStream = java.util.stream.Stream.empty[String]()
+    TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => 
javaStream)).get
+      .asInstanceOf[JavaStreamSource[String, _]].open() shouldEqual javaStream
+    val streamSource = new JavaStreamSource(() => javaStream)
+    TraversalBuilder.getValuePresentedSource(streamSource) should 
be(OptionVal.Some(streamSource))
+
+    TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => 
javaStream).async) should be(OptionVal.None)
+    TraversalBuilder.getValuePresentedSource(
+      Source.fromJavaStream(() => javaStream).mapMaterializedValue(_ => 
"Mat")) should be(
+      OptionVal.None)
+  }
+
+  "find Source.failed via TraversalBuilder with getValuePresentedSource" in {
+    val failure = new RuntimeException("failure")
+    
TraversalBuilder.getValuePresentedSource(Source.failed(failure)).get.asInstanceOf[FailedSource[String]]
+      .failure should ===(
+      failure)
+    val failedSourceA = new FailedSource(failure)
+    TraversalBuilder.getValuePresentedSource(failedSourceA) should 
be(OptionVal.Some(failedSourceA))
+
+    TraversalBuilder.getValuePresentedSource(Source.failed(failure).async) 
should be(OptionVal.None)
+    
TraversalBuilder.getValuePresentedSource(Source.failed(failure).mapMaterializedValue(_
 => "Mat")) should be(
+      OptionVal.None)
+  }
+
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
index 99b342d74b..7d4cf5de37 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlattenMergeSpec.scala
@@ -19,8 +19,6 @@ import scala.concurrent.duration._
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream._
-import pekko.stream.impl.TraversalBuilder
-import pekko.stream.impl.fusing.GraphStages.SingleSource
 import pekko.stream.stage.GraphStage
 import pekko.stream.stage.GraphStageLogic
 import pekko.stream.stage.OutHandler
@@ -29,7 +27,6 @@ import pekko.stream.testkit.TestPublisher
 import pekko.stream.testkit.Utils.TE
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.testkit.TestLatch
-import pekko.util.OptionVal
 
 import org.scalatest.exceptions.TestFailedException
 
@@ -283,16 +280,5 @@ class FlowFlattenMergeSpec extends StreamSpec {
       probe.expectComplete()
     }
 
-    "find Source.single via TraversalBuilder" in {
-      TraversalBuilder.getSingleSource(Source.single("a")).get.elem should 
===("a")
-      TraversalBuilder.getSingleSource(Source(List("a", "b"))) should 
be(OptionVal.None)
-
-      val singleSourceA = new SingleSource("a")
-      TraversalBuilder.getSingleSource(singleSourceA) should 
be(OptionVal.Some(singleSourceA))
-
-      TraversalBuilder.getSingleSource(Source.single("c").async) should 
be(OptionVal.None)
-      
TraversalBuilder.getSingleSource(Source.single("d").mapMaterializedValue(_ => 
"Mat")) should be(OptionVal.None)
-    }
-
   }
 }
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
index 4ab1c25355..b107857f86 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/FailedSource.scala
@@ -22,7 +22,7 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, 
OutHandler }
 /**
  * INTERNAL API
  */
-@InternalApi private[pekko] final class FailedSource[T](failure: Throwable) 
extends GraphStage[SourceShape[T]] {
+@InternalApi private[pekko] final class FailedSource[T](val failure: 
Throwable) extends GraphStage[SourceShape[T]] {
   val out = Outlet[T]("FailedSource.out")
   override val shape = SourceShape(out)
 
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
index 74bba55d0a..d05625fee1 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala
@@ -23,7 +23,7 @@ import java.util.function.Consumer
 
 /** INTERNAL API */
 @InternalApi private[stream] final class JavaStreamSource[T, S <: 
java.util.stream.BaseStream[T, S]](
-    open: () => java.util.stream.BaseStream[T, S])
+    val open: () => java.util.stream.BaseStream[T, S])
     extends GraphStage[SourceShape[T]] {
 
   val out: Outlet[T] = Outlet("JavaStreamSource")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
index 22c006601b..24410e0f6d 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/TraversalBuilder.scala
@@ -21,8 +21,8 @@ import pekko.annotation.{ DoNotInherit, InternalApi }
 import pekko.stream._
 import pekko.stream.impl.StreamLayout.AtomicModule
 import pekko.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
-import pekko.stream.impl.fusing.GraphStageModule
-import pekko.stream.impl.fusing.GraphStages.SingleSource
+import pekko.stream.impl.fusing.{ GraphStageModule, IterableSource }
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
 import pekko.stream.scaladsl.Keep
 import pekko.util.OptionVal
 import pekko.util.unused
@@ -380,6 +380,46 @@ import pekko.util.unused
     }
   }
 
+  /**
+   * Try to find `SingleSource` or wrapped such. This is used as a
+   * performance optimization in FlattenConcat and possibly other places.
+   * @since 1.2.0
+   */
+  @InternalApi def getValuePresentedSource[A >: Null](
+      graph: Graph[SourceShape[A], _]): OptionVal[Graph[SourceShape[A], _]] = {
+    def isValuePresentedSource(graph: Graph[SourceShape[_ <: A], _]): Boolean 
= graph match {
+      case _: SingleSource[_] | _: FutureSource[_] | _: IterableSource[_] | _: 
JavaStreamSource[_, _] |
+          _: FailedSource[_] =>
+        true
+      case maybeEmpty if isEmptySource(maybeEmpty) => true
+      case _                                       => false
+    }
+    graph match {
+      case _ if isValuePresentedSource(graph) => OptionVal.Some(graph)
+      case _ =>
+        graph.traversalBuilder match {
+          case l: LinearTraversalBuilder =>
+            l.pendingBuilder match {
+              case OptionVal.Some(a: AtomicTraversalBuilder) =>
+                a.module match {
+                  case m: GraphStageModule[_, _] =>
+                    m.stage match {
+                      case _ if 
isValuePresentedSource(m.stage.asInstanceOf[Graph[SourceShape[A], _]]) =>
+                        // It would be != EmptyTraversal if 
mapMaterializedValue was used and then we can't optimize.
+                        if ((l.traversalSoFar eq EmptyTraversal) && 
!l.attributes.isAsync)
+                          
OptionVal.Some(m.stage.asInstanceOf[Graph[SourceShape[A], _]])
+                        else OptionVal.None
+                      case _ => OptionVal.None
+                    }
+                  case _ => OptionVal.None
+                }
+              case _ => OptionVal.None
+            }
+          case _ => OptionVal.None
+        }
+    }
+  }
+
   /**
    * Test if a Graph is an empty Source.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to