This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch doOnFirst in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 999665a51c6fa9a701c8fd9b5e086b1f20989412 Author: He-Pin <[email protected]> AuthorDate: Sun Oct 19 20:25:03 2025 +0800 feat: Add doOnFirst operator --- .../stream/operators/Source-or-Flow/doOnFirst.md | 31 +++++++++++++ docs/src/main/paradox/stream/operators/index.md | 2 + .../scala/docs/stream/operators/DoOnFirst.scala | 33 ++++++++++++++ .../org/apache/pekko/stream/javadsl/FlowTest.java | 22 +++++++++ .../pekko/stream/scaladsl/FlowDoOnFirstSpec.scala | 34 ++++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../pekko/stream/impl/fusing/DoOnFirst.scala | 53 ++++++++++++++++++++++ .../org/apache/pekko/stream/javadsl/Flow.scala | 15 ++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 15 ++++++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 15 ++++++ .../apache/pekko/stream/javadsl/SubSource.scala | 15 ++++++ .../org/apache/pekko/stream/scaladsl/Flow.scala | 15 ++++++ 12 files changed, 251 insertions(+) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md new file mode 100644 index 0000000000..a7d79be36c --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md @@ -0,0 +1,31 @@ +# doOnFirst + +Run the given function when the first element is received. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.doOnFirst](Source) { scala="#doOnFirst(f:Out=>Unit):FlowOps.this.Repr[Out]" java="#map(org.apache.pekko.japi.function.Procedure)" } +@apidoc[Flow.doOnFirst](Flow) { scala="#doOnFirst(f:Out=>Unit):FlowOps.this.Repr[Out]" java="#map(org.apache.pekko.japi.function.Procedure)" } + +## Description + +Run the given function when the first element is received. + +## Examples + +Scala +: @@snip [Flow.scala](/docs/src/test/scala/docs/stream/operators/Map.scala) { #imports #map } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the mapping function returns an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 4de4334ba9..2b14389ab8 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -148,6 +148,7 @@ depending on being backpressured by downstream or not. |Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.| |Source/Flow|<a name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| |Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.| +|Source/Flow|<a name="doonfirst"></a>@ref[doOnFirst](Source-or-Flow/doOnFirst.md)|Run the given function when the first element is received.| |Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| |Source/Flow|<a name="droprepeated"></a>@ref[dropRepeated](Source-or-Flow/dropRepeated.md)|Only pass on those elements that are distinct from the previous element.| |Source/Flow|<a name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| @@ -444,6 +445,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [detach](Source-or-Flow/detach.md) * [dimap](Flow/dimap.md) * [divertTo](Source-or-Flow/divertTo.md) +* [doOnFirst](Source-or-Flow/doOnFirst.md) * [drop](Source-or-Flow/drop.md) * [dropRepeated](Source-or-Flow/dropRepeated.md) * [dropWhile](Source-or-Flow/dropWhile.md) diff --git a/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala b/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala new file mode 100644 index 0000000000..7b314bc5db --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators + +//#imports +import org.apache.pekko +import pekko.NotUsed +import pekko.stream.scaladsl._ + +//#imports + +object DoOnFirst { + + // #doOnFirst + val source: Source[Int, NotUsed] = Source(1 to 10) + val mapped: Source[Int, NotUsed] = source.doOnFirst(println) + // #doOnFirst +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index a51af7ecaa..f679ed3e88 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -124,6 +124,28 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("de"); } + @Test + public void mustBeAbleToUseDoOnFirst() throws Exception { + final var invoked = new AtomicInteger(0); + final var future = + Source.range(1, 10) + .via(Flow.of(Integer.class).doOnFirst(invoked::addAndGet)) + .runWith(Sink.ignore(), system); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + Assert.assertEquals(1, invoked.get()); + } + + @Test + public void mustBeAbleToUseDoOnFirstOnEmptySource() throws Exception { + final var invoked = new AtomicInteger(0); + final var future = + Source.<Integer>empty() + .via(Flow.of(Integer.class).doOnFirst(invoked::addAndGet)) + .runWith(Sink.ignore(), system); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + Assert.assertEquals(0, invoked.get()); + } + @Test public void mustBeAbleToUseGroupedAdjacentBy() { Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey")) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala new file mode 100644 index 0000000000..ad95e23082 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala @@ -0,0 +1,34 @@ +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko.Done +import org.apache.pekko.stream.testkit._ + +class FlowDoOnFirstSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + + "A DoOnFirst" must { + + "can only invoke on first" in { + val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + Source(1 to 10) + .via(Flow[Int].doOnFirst(invoked.addAndGet)) + .runWith(Sink.ignore) + .futureValue + .shouldBe(Done) + invoked.get() shouldBe 1 + } + + "will not invoke on empty stream" in { + val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + Source.empty + .via(Flow[Int].doOnFirst(invoked.addAndGet)) + .runWith(Sink.ignore) + .futureValue + .shouldBe(Done) + invoked.get() shouldBe 0 + } + + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 58ea8b5bd5..6977e71ec5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -35,6 +35,7 @@ import pekko.stream.Attributes._ val log = name("log") val filter = name("filter") val filterNot = name("filterNot") + val doOnFirst = name("doOnFirst") val collect = name("collect") val collectFirst = name("collectFirst") val collectWhile = name("collectWhile") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala new file mode 100644 index 0000000000..f2c444fd5a --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.Attributes.SourceLocation +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi private[pekko] final class DoOnFirst[In](f: In => Unit) extends GraphStage[FlowShape[In, In]] { + private val in = Inlet[In]("DoOnFirst.in") + private val out = Outlet[In]("DoOnFirst.out") + override val shape: FlowShape[In, In] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.doOnFirst and SourceLocation.forLambda(f) + + override def createLogic(inheritedAttributes: org.apache.pekko.stream.Attributes) = + new GraphStageLogic(shape) with InHandler with OutHandler { + self => + final override def onPush(): Unit = push(out, grab(in)) + final override def onPull(): Unit = pull(in) + setHandler(out, this) + setHandler(in, + new InHandler { + override def onPush(): Unit = { + setHandler(in, self) + val elem = grab(in) + f(elem) + push(out, elem) + } + }) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index f90bf25690..e648aa8a1e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1123,6 +1123,21 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def filterNot(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 1324dd4cb1..70566a2803 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2986,6 +2986,21 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def filterNot(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): javadsl.Source[Out, Mat] = new Source(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 45cb38c527..466584943a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -525,6 +525,21 @@ final class SubFlow[In, Out, Mat]( def filterNot(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): SubFlow[In, Out, Mat] = new javadsl.SubFlow(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index f8d45ce854..082717a843 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -516,6 +516,21 @@ final class SubSource[Out, Mat]( def filterNot(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): SubSource[Out, Mat] = new SubSource(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 159036fd25..528b6c6a7c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1567,6 +1567,21 @@ trait FlowOps[+Out, +Mat] { def filterNot(p: Out => Boolean): Repr[Out] = via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot and SourceLocation.forLambda(p))) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' the mapping function returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: Out => Unit): Repr[Out] = via(new DoOnFirst[Out](f)) + /** * Only pass on those elements that are distinct from the previous element. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
