This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch hepin-gather-statefulmap-coverage-v2
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 429a3e06797a3b34b1434b69fdb65a62da581fb8
Author: He-Pin <[email protected]>
AuthorDate: Tue Apr 7 03:53:30 2026 +0800

    stream: address deep review findings for gather operator
    
    - Add Gatherers.oneToOne() factory methods for Java DSL hot path access
    - Fix singleCollector.push to correctly handle 3+ outputs per gather call
    - Null out pendingOverflow on restart to prevent memory accumulation
    - Add null check on factory result to catch invalid factories early
    - Expand SubFlow/SubSource gather documentation for Java DSL
    - Align Scala/Java DSL documentation language
    - Add tests: materialization independence, empty upstream,
      onComplete null emission, multi-output backpressure
    
    🤖 Generated with [Qoder](https://qoder.com)
---
 .../pekko/stream/scaladsl/FlowGatherSpec.scala     | 78 ++++++++++++++++++++++
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  | 44 ++++++++----
 .../org/apache/pekko/stream/javadsl/Flow.scala     |  2 +-
 .../org/apache/pekko/stream/javadsl/Gather.scala   | 52 ++++++++++++++-
 .../org/apache/pekko/stream/javadsl/Source.scala   |  2 +-
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 22 +++++-
 .../apache/pekko/stream/javadsl/SubSource.scala    | 22 +++++-
 .../org/apache/pekko/stream/scaladsl/Gather.scala  | 35 ++++++++++
 8 files changed, 235 insertions(+), 22 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
index e8f1e9ed32..6e2b1b8dcc 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
@@ -738,6 +738,84 @@ class FlowGatherSpec extends StreamSpec {
     }
   }
 
+  "create independent gatherer instances per materialization" in {
+    val stateCounter = new AtomicInteger(0)
+    val flow = Flow[Int]
+      .gather(() => {
+        stateCounter.incrementAndGet()
+        new Gatherer[Int, Int] {
+          private var acc = 0
+          override def apply(elem: Int, collector: GatherCollector[Int]): Unit 
= {
+            acc += elem
+            collector.push(acc)
+          }
+        }
+      })
+
+    val source1 = Source(1 to 3).via(flow).runWith(TestSink[Int]())
+    val source2 = Source(10 to 12).via(flow).runWith(TestSink[Int]())
+
+    source1.request(3)
+      .expectNext(1, 3, 6)
+      .expectComplete()
+    source2.request(3)
+      .expectNext(10, 21, 33)
+      .expectComplete()
+
+    // Factory should be called once per materialization
+    stateCounter.get() shouldBe 2
+  }
+
+  "call onComplete for empty upstream" in {
+    val gate = BeenCalledTimesGate()
+    Source.empty[Int]
+      .gather(() =>
+        new Gatherer[Int, Int] {
+          override def apply(elem: Int, collector: GatherCollector[Int]): Unit 
=
+            collector.push(elem)
+
+          override def onComplete(collector: GatherCollector[Int]): Unit =
+            gate.mark()
+        })
+      .runWith(TestSink[Int]())
+      .request(1)
+      .expectComplete()
+    gate.ensure()
+  }
+
+  "fail when onComplete emits null" in {
+    Source.single(1)
+      .gather(() =>
+        new Gatherer[Int, String] {
+          override def apply(elem: Int, collector: GatherCollector[String]): 
Unit = ()
+          override def onComplete(collector: GatherCollector[String]): Unit =
+            collector.push(null.asInstanceOf[String])
+        })
+      .runWith(TestSink[String]())
+      .request(1)
+      .expectError() shouldBe a[NullPointerException]
+  }
+
+  "handle 3+ outputs with backpressure mid-drain" in {
+    Source.single(1)
+      .gather(() =>
+        new Gatherer[Int, Int] {
+          override def apply(elem: Int, collector: GatherCollector[Int]): Unit 
= {
+            collector.push(elem)
+            collector.push(elem + 1)
+            collector.push(elem + 2)
+            collector.push(elem + 3)
+          }
+        })
+      .runWith(TestSink[Int]())
+      .request(2)
+      .expectNext(1, 2)
+      .expectNoMessage(200.millis)
+      .request(2)
+      .expectNext(3, 4)
+      .expectComplete()
+  }
+
   "support junction output ports" in {
     val source = Source(List((1, 1), (2, 2)))
     val graph = RunnableGraph.fromGraph(GraphDSL.createGraph(TestSink[(Int, 
Int)]()) { implicit b => sink =>
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index a2e6d1d9db..69beb30715 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -55,6 +55,7 @@ import pekko.stream.scaladsl.{
   StatefulMapConcatAccumulator,
   StatefulMapConcatAccumulatorFactory
 }
+import pekko.stream.javadsl.{ Gatherers => JGatherers }
 import pekko.stream.stage._
 import pekko.util.{ ConstantFun, OptionVal }
 
@@ -2364,17 +2365,25 @@ private[pekko] final class Gather[In, Out](factory: () 
=> Gatherer[In, Out]) ext
       private val singleCollector = new GatherCollector[Out] {
         override def push(elem: Out): Unit = {
           ReactiveStreamsCompliance.requireNonNullElement(elem)
-          val cb = callbackFirst
-          if (cb.asInstanceOf[AnyRef] eq null) {
-            callbackFirst = elem
-          } else {
-            pendingFirst = cb
-            hasPendingFirst = true
-            callbackFirst = null.asInstanceOf[Out]
-            multiMode = true
+          if (hasPendingFirst) {
+            // Already in multi mode: all pushes go directly to overflow queue.
             if (pendingOverflow eq null)
               pendingOverflow = new java.util.ArrayDeque[Out]()
             pendingOverflow.addLast(elem)
+          } else {
+            val cb = callbackFirst
+            if (cb.asInstanceOf[AnyRef] eq null) {
+              callbackFirst = elem
+            } else {
+              // Second output from this gather call: transition to multi mode.
+              pendingFirst = cb
+              hasPendingFirst = true
+              callbackFirst = null.asInstanceOf[Out]
+              multiMode = true
+              if (pendingOverflow eq null)
+                pendingOverflow = new java.util.ArrayDeque[Out]()
+              pendingOverflow.addLast(elem)
+            }
           }
         }
       }
@@ -2391,7 +2400,8 @@ private[pekko] final class Gather[In, Out](factory: () => 
Gatherer[In, Out]) ext
       private var hasPendingFirst = false
       private var multiMode = false
       private var gatherer: Gatherer[In, Out] = _
-      private var oneToOneGatherer: OneToOneGatherer[In, Out] = _
+      // Hot-path handle for one-to-one mappings. Supports both Scala and Java 
DSL implementations.
+      private var oneToOneGatherer: AnyRef = _
       private var finalAction = FinalAction.None
       private var finalFailure: Throwable = null
       private var needInvokeOnCompleteCallback = false
@@ -2466,7 +2476,10 @@ private[pekko] final class Gather[In, Out](factory: () 
=> Gatherer[In, Out]) ext
       }
 
       private def onPushOneToOne(): Unit = {
-        val elem = oneToOneGatherer.applyOne(grab(in))
+        val elem = oneToOneGatherer match {
+          case s: OneToOneGatherer[In, Out] @unchecked   => 
s.applyOne(grab(in))
+          case j: JGatherers.OneToOneGatherer[In, Out] @unchecked => 
j.applyOne(grab(in))
+        }
         ReactiveStreamsCompliance.requireNonNullElement(elem)
         if (isAvailable(out))
           push(out, elem)
@@ -2543,12 +2556,17 @@ private[pekko] final class Gather[In, Out](factory: () 
=> Gatherer[In, Out]) ext
           pull(in)
 
       private def restartGatherer(): Unit = {
-        gatherer = factory()
+        val newGatherer = factory()
+        if (newGatherer eq null)
+          throw new IllegalStateException("Gatherer factory must not return 
null")
+        gatherer = newGatherer
         oneToOneGatherer = gatherer match {
-          case specialized: OneToOneGatherer[In, Out] @unchecked => specialized
-          case _                                                  => null
+          case _: OneToOneGatherer[?, ?]                            => gatherer
+          case _: JGatherers.OneToOneGatherer[?, ?]                 => gatherer
+          case _                                                    => null
         }
         multiMode = false
+        pendingOverflow = null
         needInvokeOnCompleteCallback = true
       }
 
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index fdd2dae31a..0bdd5de129 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -872,7 +872,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, 
Out, Mat]) extends Gr
    * Transform each input element into zero or more output elements without 
requiring tuple or collection allocations
    * imposed by the operator API itself.
    *
-   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields.
+   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields or via captured variables.
    * The provided [[GatherCollector]] can emit zero or more output elements 
for each input element.
    *
    * The collector is only valid while the callback is running. Emitted 
elements MUST NOT be `null`.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
index d5c3b1d091..d65aaeb411 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
@@ -24,6 +24,7 @@ import org.apache.pekko.japi.function
  * Collector passed to [[Gatherer]] for emitting output elements.
  *
  * The collector is only valid while the current [[Gatherer]] callback is 
running.
+ * Emitted elements MUST NOT be `null`.
  *
  * @since 1.3.0
  */
@@ -38,7 +39,7 @@ trait GatherCollector[-Out] extends function.Procedure[Out] {
  * A stateful gatherer for the `gather` operator.
  *
  * A new gatherer instance is created for each materialization and on each 
supervision restart.
- * It can keep mutable state in fields.
+ * It can keep mutable state in fields or via captured variables.
  *
  * @since 1.3.0
  */
@@ -54,3 +55,52 @@ trait Gatherer[-In, Out] extends function.Procedure2[In, 
GatherCollector[Out]] {
    */
   def onComplete(collector: GatherCollector[Out]): Unit = ()
 }
+
+/** Factory methods for [[Gatherer]]. */
+object Gatherers {
+
+  /**
+   * Creates a specialized `Gatherer` for one-to-one transformations (exactly 
one output per input).
+   *
+   * This variant avoids the overhead of the `GatherCollector` indirection and 
achieves the
+   * same performance as the native `map` operator while still supporting 
mutable state and
+   * the `onComplete` callback.
+   *
+   * @param f the one-to-one transformation function
+   * @since 1.3.0
+   */
+  def oneToOne[In, Out](f: function.Function[In, Out]): Gatherer[In, Out] =
+    new OneToOneGathererImpl[In, Out](f)
+
+  /**
+   * Creates a specialized `Gatherer` for one-to-one transformations with an 
`onComplete` callback.
+   *
+   * @param f the one-to-one transformation function
+   * @param onComplete callback invoked when the stage terminates or restarts
+   * @since 1.3.0
+   */
+  def oneToOne[In, Out](f: function.Function[In, Out], onComplete: 
function.Effect): Gatherer[In, Out] =
+    new OneToOneGathererImpl[In, Out](f, onComplete)
+
+  /**
+   * A specialized [[Gatherer]] for one-to-one transformations that avoids the 
`GatherCollector` overhead.
+   *
+   * @since 1.3.0
+   */
+  @DoNotInherit
+  trait OneToOneGatherer[In, Out] extends Gatherer[In, Out] {
+    def applyOne(in: In): Out
+
+    final override def apply(in: In, collector: GatherCollector[Out]): Unit =
+      collector.push(applyOne(in))
+  }
+
+  private final class OneToOneGathererImpl[In, Out](
+      f: function.Function[In, Out],
+      onCompleteCallback: function.Effect = null)
+      extends OneToOneGatherer[In, Out] {
+    override def applyOne(in: In): Out = f.apply(in)
+    override def onComplete(@annotation.nowarn collector: 
GatherCollector[Out]): Unit =
+      if (onCompleteCallback != null) onCompleteCallback.apply()
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index cb4f2ae576..ee8d25a670 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -2765,7 +2765,7 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
    * Transform each input element into zero or more output elements without 
requiring tuple or collection allocations
    * imposed by the operator API itself.
    *
-   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields.
+   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields or via captured variables.
    * The provided [[GatherCollector]] can emit zero or more output elements 
for each input element.
    *
    * The collector is only valid while the callback is running. Emitted 
elements MUST NOT be `null`.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index 80fe136d50..0ce6b0fa47 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -349,9 +349,25 @@ final class SubFlow[In, Out, Mat](
    * Transform each input element into zero or more output elements without 
requiring tuple or collection allocations
    * imposed by the operator API itself.
    *
-   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields.
-   * `onComplete` is invoked on upstream completion, upstream failure, 
downstream cancellation,
-   * abrupt stage termination, and supervision restart.
+   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields or via captured variables.
+   * The provided [[GatherCollector]] can emit zero or more output elements 
for each input element.
+   *
+   * The collector is only valid while the callback is running. Emitted 
elements MUST NOT be `null`.
+   *
+   * The `onComplete` callback is invoked once whenever the stage terminates 
or restarts: on upstream completion,
+   * upstream failure, downstream cancellation, abrupt stage termination, or 
supervision restart.
+   * Elements emitted from `onComplete` are emitted before upstream-failure 
propagation, completion, or restart,
+   * and are ignored on downstream cancellation and abrupt termination.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the gatherer emits an element and downstream is ready to 
consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes and the gatherer has emitted all 
pending elements, including `onComplete`
+   *
+   * '''Cancels when''' downstream cancels
    *
    * @since 1.3.0
    */
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index e5d80fb734..0513d06fe4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -340,9 +340,25 @@ final class SubSource[Out, Mat](
    * Transform each input element into zero or more output elements without 
requiring tuple or collection allocations
    * imposed by the operator API itself.
    *
-   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields.
-   * `onComplete` is invoked on upstream completion, upstream failure, 
downstream cancellation,
-   * abrupt stage termination, and supervision restart.
+   * A new [[Gatherer]] is created for each materialization and can keep 
mutable state in fields or via captured variables.
+   * The provided [[GatherCollector]] can emit zero or more output elements 
for each input element.
+   *
+   * The collector is only valid while the callback is running. Emitted 
elements MUST NOT be `null`.
+   *
+   * The `onComplete` callback is invoked once whenever the stage terminates 
or restarts: on upstream completion,
+   * upstream failure, downstream cancellation, abrupt stage termination, or 
supervision restart.
+   * Elements emitted from `onComplete` are emitted before upstream-failure 
propagation, completion, or restart,
+   * and are ignored on downstream cancellation and abrupt termination.
+   *
+   * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+   *
+   * '''Emits when''' the gatherer emits an element and downstream is ready to 
consume it
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes and the gatherer has emitted all 
pending elements, including `onComplete`
+   *
+   * '''Cancels when''' downstream cancels
    *
    * @since 1.3.0
    */
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
index 88fa030657..85fa35a0ed 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
@@ -53,6 +53,41 @@ trait Gatherer[-In, +Out] {
   def onComplete(collector: GatherCollector[Out]): Unit = ()
 }
 
+/** Factory methods for [[Gatherer]]. */
+object Gatherer {
+
+  /**
+   * Creates a specialized `Gatherer` for one-to-one transformations (exactly 
one output per input).
+   *
+   * This variant avoids the overhead of the `GatherCollector` indirection and 
achieves the
+   * same performance as the native `map` operator while still supporting 
mutable state and
+   * the `onComplete` callback.
+   *
+   * @param f the one-to-one transformation function
+   * @since 1.3.0
+   */
+  def oneToOne[In, Out](f: In => Out): Gatherer[In, Out] =
+    new OneToOneGathererImpl(f)
+
+  /**
+   * Creates a specialized `Gatherer` for one-to-one transformations with an 
`onComplete` callback.
+   *
+   * @param f the one-to-one transformation function
+   * @param onComplete callback invoked when the stage terminates or restarts
+   * @since 1.3.0
+   */
+  def oneToOne[In, Out](f: In => Out, onComplete: () => Unit): Gatherer[In, 
Out] =
+    new OneToOneGathererImpl(f, onComplete)
+
+  private final class OneToOneGathererImpl[In, Out](
+      f: In => Out,
+      onCompleteCallback: () => Unit = () => ())
+      extends OneToOneGatherer[In, Out] {
+    override def applyOne(in: In): Out = f(in)
+    override def onComplete(collector: GatherCollector[Out]): Unit = 
onCompleteCallback()
+  }
+}
+
 /**
  * INTERNAL API
  */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to