This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch 1.3.x-doOnFirst in repository https://gitbox.apache.org/repos/asf/pekko.git
commit 4bc3a75890349efa6861face7897ce27b043b2ef Author: He-Pin(kerr) <[email protected]> AuthorDate: Sat Oct 25 19:58:36 2025 +0800 feat: Add doOnFirst operator (#2363) (cherry picked from commit 36d8a1b7b7562870bc2b9d450d4921f88c069d13) --- .../stream/operators/Source-or-Flow/doOnFirst.md | 31 +++++++++++++ docs/src/main/paradox/stream/operators/index.md | 2 + .../scala/docs/stream/operators/DoOnFirst.scala | 33 ++++++++++++++ .../org/apache/pekko/stream/javadsl/FlowTest.java | 22 +++++++++ .../pekko/stream/scaladsl/FlowDoOnFirstSpec.scala | 51 +++++++++++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../pekko/stream/impl/fusing/DoOnFirst.scala | 53 ++++++++++++++++++++++ .../org/apache/pekko/stream/javadsl/Flow.scala | 15 ++++++ .../org/apache/pekko/stream/javadsl/Source.scala | 15 ++++++ .../org/apache/pekko/stream/javadsl/SubFlow.scala | 15 ++++++ .../apache/pekko/stream/javadsl/SubSource.scala | 15 ++++++ .../org/apache/pekko/stream/scaladsl/Flow.scala | 15 ++++++ 12 files changed, 268 insertions(+) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md new file mode 100644 index 0000000000..13a57301b3 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md @@ -0,0 +1,31 @@ +# doOnFirst + +Run the given function when the first element is received. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.doOnFirst](Source) { scala="#doOnFirst(f:Out=>Unit):FlowOps.this.Repr[Out]" java="#doOnFirst(org.apache.pekko.japi.function.Procedure)" } +@apidoc[Flow.doOnFirst](Flow) { scala="#doOnFirst(f:Out=>Unit):FlowOps.this.Repr[Out]" java="#doOnFirst(org.apache.pekko.japi.function.Procedure)" } + +## Description + +Run the given function when the first element is received. + +## Examples + +Scala +: @@snip [Flow.scala](/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala) { #imports #doOnFirst } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 62169a4d7b..58f9191077 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -159,6 +159,7 @@ depending on being backpressured by downstream or not. |Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.| |Source/Flow|<a name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| |Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.| +|Source/Flow|<a name="doonfirst"></a>@ref[doOnFirst](Source-or-Flow/doOnFirst.md)|Run the given function when the first element is received.| |Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| |Source/Flow|<a name="droprepeated"></a>@ref[dropRepeated](Source-or-Flow/dropRepeated.md)|Only pass on those elements that are distinct from the previous element.| |Source/Flow|<a name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| @@ -457,6 +458,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [detach](Source-or-Flow/detach.md) * [dimap](Flow/dimap.md) * [divertTo](Source-or-Flow/divertTo.md) +* [doOnFirst](Source-or-Flow/doOnFirst.md) * [drop](Source-or-Flow/drop.md) * [dropRepeated](Source-or-Flow/dropRepeated.md) * [dropWhile](Source-or-Flow/dropWhile.md) diff --git a/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala b/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala new file mode 100644 index 0000000000..7b314bc5db --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators + +//#imports +import org.apache.pekko +import pekko.NotUsed +import pekko.stream.scaladsl._ + +//#imports + +object DoOnFirst { + + // #doOnFirst + val source: Source[Int, NotUsed] = Source(1 to 10) + val mapped: Source[Int, NotUsed] = source.doOnFirst(println) + // #doOnFirst +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index c2603a8c4d..6fd2c483fd 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -123,6 +123,28 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("de"); } + @Test + public void mustBeAbleToUseDoOnFirst() throws Exception { + final var invoked = new AtomicInteger(0); + final var future = + Source.range(1, 10) + .via(Flow.of(Integer.class).doOnFirst(invoked::addAndGet)) + .runWith(Sink.ignore(), system); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + Assert.assertEquals(1, invoked.get()); + } + + @Test + public void mustBeAbleToUseDoOnFirstOnEmptySource() throws Exception { + final var invoked = new AtomicInteger(0); + final var future = + Source.<Integer>empty() + .via(Flow.of(Integer.class).doOnFirst(invoked::addAndGet)) + .runWith(Sink.ignore(), system); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + Assert.assertEquals(0, invoked.get()); + } + @Test public void mustBeAbleToUseGroupedAdjacentBy() { Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey")) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala new file mode 100644 index 0000000000..9c832d8003 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko.Done +import org.apache.pekko.stream.testkit._ + +class FlowDoOnFirstSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + + "A DoOnFirst" must { + + "can only invoke on first" in { + val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + Source(1 to 10) + .via(Flow[Int].doOnFirst(invoked.addAndGet)) + .runWith(Sink.ignore) + .futureValue + .shouldBe(Done) + invoked.get() shouldBe 1 + } + + "will not invoke on empty stream" in { + val invoked = new java.util.concurrent.atomic.AtomicInteger(0) + Source.empty + .via(Flow[Int].doOnFirst(invoked.addAndGet)) + .runWith(Sink.ignore) + .futureValue + .shouldBe(Done) + invoked.get() shouldBe 0 + } + + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 58ea8b5bd5..6977e71ec5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -35,6 +35,7 @@ import pekko.stream.Attributes._ val log = name("log") val filter = name("filter") val filterNot = name("filterNot") + val doOnFirst = name("doOnFirst") val collect = name("collect") val collectFirst = name("collectFirst") val collectWhile = name("collectWhile") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala new file mode 100644 index 0000000000..f2c444fd5a --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.Attributes.SourceLocation +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi private[pekko] final class DoOnFirst[In](f: In => Unit) extends GraphStage[FlowShape[In, In]] { + private val in = Inlet[In]("DoOnFirst.in") + private val out = Outlet[In]("DoOnFirst.out") + override val shape: FlowShape[In, In] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.doOnFirst and SourceLocation.forLambda(f) + + override def createLogic(inheritedAttributes: org.apache.pekko.stream.Attributes) = + new GraphStageLogic(shape) with InHandler with OutHandler { + self => + final override def onPush(): Unit = push(out, grab(in)) + final override def onPull(): Unit = pull(in) + setHandler(out, this) + setHandler(in, + new InHandler { + override def onPush(): Unit = { + setHandler(in, self) + val elem = grab(in) + f(elem) + push(out, elem) + } + }) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index fe5a1592da..9809240973 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1193,6 +1193,21 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def filterNot(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 789eed2130..7545282a4f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3186,6 +3186,21 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def filterNot(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): javadsl.Source[Out, Mat] = new Source(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index f873b8b287..55c292e4d4 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -529,6 +529,21 @@ class SubFlow[In, Out, Mat]( def filterNot(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): SubFlow[In, Out, Mat] = new javadsl.SubFlow(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index a762b2500f..8a9fb31c28 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -520,6 +520,21 @@ class SubSource[Out, Mat]( def filterNot(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.filterNot(p.test)) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: function.Procedure[Out]): SubSource[Out, Mat] = new SubSource(delegate.doOnFirst(f(_))) + /** * Only pass on those elements that are distinct from the previous element. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index d3d161b7ba..5c1116741b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1636,6 +1636,21 @@ trait FlowOps[+Out, +Mat] { def filterNot(p: Out => Boolean): Repr[Out] = via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot and SourceLocation.forLambda(p))) + /** + * Run the given function when the first element is received. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.3.0 + */ + def doOnFirst(f: Out => Unit): Repr[Out] = via(new DoOnFirst[Out](f)) + /** * Only pass on those elements that are distinct from the previous element. * --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
