This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch hepin-gather-statefulmap-coverage-v2
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to
refs/heads/hepin-gather-statefulmap-coverage-v2 by this push:
new 49f2270bb0 stream: apply scalafmt formatting to gather operator files
49f2270bb0 is described below
commit 49f2270bb06e1156b1ac49e05736127b13eeafbd
Author: He-Pin <[email protected]>
AuthorDate: Tue Apr 7 03:59:05 2026 +0800
stream: apply scalafmt formatting to gather operator files
🤖 Generated with [Qoder](https://qoder.com)
---
.../scala/docs/stream/operators/flow/Gather.scala | 2 +-
.../pekko/stream/scaladsl/FlowGatherSpec.scala | 55 ++++++++++++----------
.../org/apache/pekko/stream/impl/fusing/Ops.scala | 8 ++--
3 files changed, 36 insertions(+), 29 deletions(-)
diff --git a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
index 4b5fbb7a99..f8f7beadc2 100644
--- a/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
+++ b/docs/src/test/scala/docs/stream/operators/flow/Gather.scala
@@ -84,7 +84,7 @@ object Gather {
override def apply(elem: String, collector:
GatherCollector[String]): Unit =
lastElement match {
case Some(last) if last == elem =>
- case _ =>
+ case _ =>
lastElement = Some(elem)
collector.push(elem)
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
index 6e2b1b8dcc..46c7f060e9 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
@@ -26,11 +26,17 @@ import scala.util.Success
import scala.util.control.NoStackTrace
import org.apache.pekko.Done
-import org.apache.pekko.stream.{ AbruptStageTerminationException,
ActorAttributes, ActorMaterializer, ClosedShape, Supervision }
+import org.apache.pekko.stream.{
+ AbruptStageTerminationException,
+ ActorAttributes,
+ ActorMaterializer,
+ ClosedShape,
+ Supervision
+}
import org.apache.pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import org.apache.pekko.stream.testkit.Utils.TE
import org.apache.pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
-import org.apache.pekko.stream.scaladsl.{ Keep, Flow }
+import org.apache.pekko.stream.scaladsl.{ Flow, Keep }
import org.apache.pekko.testkit.EventFilter
class FlowGatherSpec extends StreamSpec {
@@ -286,7 +292,7 @@ class FlowGatherSpec extends StreamSpec {
override def apply(elem: String, collector:
GatherCollector[String]): Unit =
lastElement match {
case Some(last) if last == elem =>
- case _ =>
+ case _ =>
lastElement = Some(elem)
collector.push(elem)
}
@@ -329,16 +335,16 @@ class FlowGatherSpec extends StreamSpec {
val generation = new AtomicInteger(0)
val (source, sink) = TestSource[String]()
.viaMat(Flow[String].gather(() => {
- val currentGeneration = generation.incrementAndGet()
- new Gatherer[String, String] {
- override def apply(elem: String, collector:
GatherCollector[String]): Unit =
- if (elem == "boom") throw TE("boom")
- else collector.push(s"$elem$currentGeneration")
-
- override def onComplete(collector: GatherCollector[String]):
Unit =
- collector.push(s"onClose$currentGeneration")
- }
- }))(Keep.left)
+ val currentGeneration = generation.incrementAndGet()
+ new Gatherer[String, String] {
+ override def apply(elem: String, collector:
GatherCollector[String]): Unit =
+ if (elem == "boom") throw TE("boom")
+ else collector.push(s"$elem$currentGeneration")
+
+ override def onComplete(collector: GatherCollector[String]): Unit =
+ collector.push(s"onClose$currentGeneration")
+ }
+ }))(Keep.left)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.toMat(TestSink())(Keep.both)
.run()
@@ -688,15 +694,15 @@ class FlowGatherSpec extends StreamSpec {
val closedCounter = new AtomicInteger(0)
val (source, sink) = TestSource[Int]()
.viaMat(Flow[Int].gather(() =>
- new Gatherer[Int, Int] {
- override def apply(elem: Int, collector: GatherCollector[Int]):
Unit =
- collector.push(elem)
+ new Gatherer[Int, Int] {
+ override def apply(elem: Int, collector: GatherCollector[Int]):
Unit =
+ collector.push(elem)
- override def onComplete(collector: GatherCollector[Int]): Unit =
{
- closedCounter.incrementAndGet()
- throw TE("boom")
- }
- }))(Keep.left)
+ override def onComplete(collector: GatherCollector[Int]): Unit = {
+ closedCounter.incrementAndGet()
+ throw TE("boom")
+ }
+ }))(Keep.left)
.toMat(TestSink[Int]())(Keep.both)
.run()
@@ -822,12 +828,13 @@ class FlowGatherSpec extends StreamSpec {
import GraphDSL.Implicits._
val unzip = b.add(Unzip[Int, Int]())
val zip = b.add(Zip[Int, Int]())
- val gather = b.add(Flow[(Int, Int)].gather(() => (elem: (Int, Int),
collector: GatherCollector[(Int, Int)]) => collector.push(elem)))
+ val gather = b.add(Flow[(Int, Int)].gather(() =>
+ (elem: (Int, Int), collector: GatherCollector[(Int, Int)]) =>
collector.push(elem)))
- source ~> unzip.in
+ source ~> unzip.in
unzip.out0 ~> zip.in0
unzip.out1 ~> zip.in1
- zip.out ~> gather ~> sink.in
+ zip.out ~> gather ~> sink.in
ClosedShape
})
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 69beb30715..e7955dcad2 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -2477,7 +2477,7 @@ private[pekko] final class Gather[In, Out](factory: () =>
Gatherer[In, Out]) ext
private def onPushOneToOne(): Unit = {
val elem = oneToOneGatherer match {
- case s: OneToOneGatherer[In, Out] @unchecked =>
s.applyOne(grab(in))
+ case s: OneToOneGatherer[In, Out] @unchecked =>
s.applyOne(grab(in))
case j: JGatherers.OneToOneGatherer[In, Out] @unchecked =>
j.applyOne(grab(in))
}
ReactiveStreamsCompliance.requireNonNullElement(elem)
@@ -2561,9 +2561,9 @@ private[pekko] final class Gather[In, Out](factory: () =>
Gatherer[In, Out]) ext
throw new IllegalStateException("Gatherer factory must not return
null")
gatherer = newGatherer
oneToOneGatherer = gatherer match {
- case _: OneToOneGatherer[?, ?] => gatherer
- case _: JGatherers.OneToOneGatherer[?, ?] => gatherer
- case _ => null
+ case _: OneToOneGatherer[?, ?] => gatherer
+ case _: JGatherers.OneToOneGatherer[?, ?] => gatherer
+ case _ => null
}
multiMode = false
pendingOverflow = null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]