This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.3.x-doOnFirst
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 4bc3a75890349efa6861face7897ce27b043b2ef
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Oct 25 19:58:36 2025 +0800

    feat: Add doOnFirst operator (#2363)
    
    (cherry picked from commit 36d8a1b7b7562870bc2b9d450d4921f88c069d13)
---
 .../stream/operators/Source-or-Flow/doOnFirst.md   | 31 +++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../scala/docs/stream/operators/DoOnFirst.scala    | 33 ++++++++++++++
 .../org/apache/pekko/stream/javadsl/FlowTest.java  | 22 +++++++++
 .../pekko/stream/scaladsl/FlowDoOnFirstSpec.scala  | 51 +++++++++++++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |  1 +
 .../pekko/stream/impl/fusing/DoOnFirst.scala       | 53 ++++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Flow.scala     | 15 ++++++
 .../org/apache/pekko/stream/javadsl/Source.scala   | 15 ++++++
 .../org/apache/pekko/stream/javadsl/SubFlow.scala  | 15 ++++++
 .../apache/pekko/stream/javadsl/SubSource.scala    | 15 ++++++
 .../org/apache/pekko/stream/scaladsl/Flow.scala    | 15 ++++++
 12 files changed, 268 insertions(+)

diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md 
b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md
new file mode 100644
index 0000000000..13a57301b3
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/doOnFirst.md
@@ -0,0 +1,31 @@
+# doOnFirst
+
+Run the given function when the first element is received.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.doOnFirst](Source) { 
scala="#doOnFirst(f:Out=&gt;Unit):FlowOps.this.Repr[Out]" 
java="#doOnFirst(org.apache.pekko.japi.function.Procedure)" }
+@apidoc[Flow.doOnFirst](Flow) { 
scala="#doOnFirst(f:Out=&gt;Unit):FlowOps.this.Repr[Out]" 
java="#doOnFirst(org.apache.pekko.japi.function.Procedure)" }
+
+## Description
+
+Run the given function when the first element is received.
+
+## Examples
+
+Scala
+:  @@snip 
[Flow.scala](/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala) { 
#imports #doOnFirst }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when upstream emits an element
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index 62169a4d7b..58f9191077 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -159,6 +159,7 @@ depending on being backpressured by downstream or not.
 |Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform 
this Flow by applying a function to each *incoming* upstream element before it 
is passed to the Flow.|
 |Source/Flow|<a 
name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand 
from downstream demand without detaching the stream rates.|
 |Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by 
applying a function `f` to each *incoming* upstream element before it is passed 
to the Flow, and a function `g` to each *outgoing* downstream element.|
+|Source/Flow|<a 
name="doonfirst"></a>@ref[doOnFirst](Source-or-Flow/doOnFirst.md)|Run the given 
function when the first element is received.|
 |Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` 
elements and then pass any subsequent element downstream.|
 |Source/Flow|<a 
name="droprepeated"></a>@ref[dropRepeated](Source-or-Flow/dropRepeated.md)|Only 
pass on those elements that are distinct from the previous element.|
 |Source/Flow|<a 
name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements 
as long as a predicate function return true for the element|
@@ -457,6 +458,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [detach](Source-or-Flow/detach.md)
 * [dimap](Flow/dimap.md)
 * [divertTo](Source-or-Flow/divertTo.md)
+* [doOnFirst](Source-or-Flow/doOnFirst.md)
 * [drop](Source-or-Flow/drop.md)
 * [dropRepeated](Source-or-Flow/dropRepeated.md)
 * [dropWhile](Source-or-Flow/dropWhile.md)
diff --git a/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala 
b/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala
new file mode 100644
index 0000000000..7b314bc5db
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/DoOnFirst.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators
+
+//#imports
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.stream.scaladsl._
+
+//#imports
+
+object DoOnFirst {
+
+  // #doOnFirst
+  val source: Source[Int, NotUsed] = Source(1 to 10)
+  val mapped: Source[Int, NotUsed] = source.doOnFirst(println)
+  // #doOnFirst
+}
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index c2603a8c4d..6fd2c483fd 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -123,6 +123,28 @@ public class FlowTest extends StreamTest {
     probe.expectMsgEquals("de");
   }
 
+  @Test
+  public void mustBeAbleToUseDoOnFirst() throws Exception {
+    final var invoked = new AtomicInteger(0);
+    final var future =
+        Source.range(1, 10)
+            .via(Flow.of(Integer.class).doOnFirst(invoked::addAndGet))
+            .runWith(Sink.ignore(), system);
+    future.toCompletableFuture().get(3, TimeUnit.SECONDS);
+    Assert.assertEquals(1, invoked.get());
+  }
+
+  @Test
+  public void mustBeAbleToUseDoOnFirstOnEmptySource() throws Exception {
+    final var invoked = new AtomicInteger(0);
+    final var future =
+        Source.<Integer>empty()
+            .via(Flow.of(Integer.class).doOnFirst(invoked::addAndGet))
+            .runWith(Sink.ignore(), system);
+    future.toCompletableFuture().get(3, TimeUnit.SECONDS);
+    Assert.assertEquals(0, invoked.get());
+  }
+
   @Test
   public void mustBeAbleToUseGroupedAdjacentBy() {
     Source.from(Arrays.asList("Hello", "Hi", "Greetings", "Hey"))
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala
new file mode 100644
index 0000000000..9c832d8003
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDoOnFirstSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko.Done
+import org.apache.pekko.stream.testkit._
+
+class FlowDoOnFirstSpec extends StreamSpec("""
+    pekko.stream.materializer.initial-input-buffer-size = 2
+  """) with ScriptedTest {
+
+  "A DoOnFirst" must {
+
+    "can only invoke on first" in {
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      Source(1 to 10)
+        .via(Flow[Int].doOnFirst(invoked.addAndGet))
+        .runWith(Sink.ignore)
+        .futureValue
+        .shouldBe(Done)
+      invoked.get() shouldBe 1
+    }
+
+    "will not invoke on empty stream" in {
+      val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
+      Source.empty
+        .via(Flow[Int].doOnFirst(invoked.addAndGet))
+        .runWith(Sink.ignore)
+        .futureValue
+        .shouldBe(Done)
+      invoked.get() shouldBe 0
+    }
+
+  }
+
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 58ea8b5bd5..6977e71ec5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -35,6 +35,7 @@ import pekko.stream.Attributes._
     val log = name("log")
     val filter = name("filter")
     val filterNot = name("filterNot")
+    val doOnFirst = name("doOnFirst")
     val collect = name("collect")
     val collectFirst = name("collectFirst")
     val collectWhile = name("collectWhile")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala
new file mode 100644
index 0000000000..f2c444fd5a
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DoOnFirst.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.Attributes.SourceLocation
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
+import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler 
}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] final class DoOnFirst[In](f: In => Unit) extends 
GraphStage[FlowShape[In, In]] {
+  private val in = Inlet[In]("DoOnFirst.in")
+  private val out = Outlet[In]("DoOnFirst.out")
+  override val shape: FlowShape[In, In] = FlowShape(in, out)
+
+  override def initialAttributes: Attributes = DefaultAttributes.doOnFirst and 
SourceLocation.forLambda(f)
+
+  override def createLogic(inheritedAttributes: 
org.apache.pekko.stream.Attributes) =
+    new GraphStageLogic(shape) with InHandler with OutHandler {
+      self =>
+      final override def onPush(): Unit = push(out, grab(in))
+      final override def onPull(): Unit = pull(in)
+      setHandler(out, this)
+      setHandler(in,
+        new InHandler {
+          override def onPush(): Unit = {
+            setHandler(in, self)
+            val elem = grab(in)
+            f(elem)
+            push(out, elem)
+          }
+        })
+    }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index fe5a1592da..9809240973 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -1193,6 +1193,21 @@ final class Flow[In, Out, Mat](delegate: 
scaladsl.Flow[In, Out, Mat]) extends Gr
   def filterNot(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] =
     new Flow(delegate.filterNot(p.test))
 
+  /**
+   * Run the given function when the first element is received.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def doOnFirst(f: function.Procedure[Out]): javadsl.Flow[In, Out, Mat] = new 
Flow(delegate.doOnFirst(f(_)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 789eed2130..7545282a4f 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -3186,6 +3186,21 @@ final class Source[Out, Mat](delegate: 
scaladsl.Source[Out, Mat]) extends Graph[
   def filterNot(p: function.Predicate[Out]): javadsl.Source[Out, Mat] =
     new Source(delegate.filterNot(p.test))
 
+  /**
+   * Run the given function when the first element is received.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def doOnFirst(f: function.Procedure[Out]): javadsl.Source[Out, Mat] = new 
Source(delegate.doOnFirst(f(_)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index f873b8b287..55c292e4d4 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -529,6 +529,21 @@ class SubFlow[In, Out, Mat](
   def filterNot(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
     new SubFlow(delegate.filterNot(p.test))
 
+  /**
+   * Run the given function when the first element is received.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def doOnFirst(f: function.Procedure[Out]): SubFlow[In, Out, Mat] = new 
javadsl.SubFlow(delegate.doOnFirst(f(_)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index a762b2500f..8a9fb31c28 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -520,6 +520,21 @@ class SubSource[Out, Mat](
   def filterNot(p: function.Predicate[Out]): SubSource[Out, Mat] =
     new SubSource(delegate.filterNot(p.test))
 
+  /**
+   * Run the given function when the first element is received.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def doOnFirst(f: function.Procedure[Out]): SubSource[Out, Mat] = new 
SubSource(delegate.doOnFirst(f(_)))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index d3d161b7ba..5c1116741b 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1636,6 +1636,21 @@ trait FlowOps[+Out, +Mat] {
   def filterNot(p: Out => Boolean): Repr[Out] =
     via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot and 
SourceLocation.forLambda(p)))
 
+  /**
+   * Run the given function when the first element is received.
+   *
+   * '''Emits when''' upstream emits an element
+   *
+   * '''Backpressures when''' downstream backpressures
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Cancels when''' downstream cancels
+   *
+   * @since 1.3.0
+   */
+  def doOnFirst(f: Out => Unit): Repr[Out] = via(new DoOnFirst[Out](f))
+
   /**
    * Only pass on those elements that are distinct from the previous element.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to