This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new a1ade992ff feat: Add Sink#count operator. (#2244)
a1ade992ff is described below

commit a1ade992ffd8da820383d72ce3dca15b8242f58a
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Sep 21 16:15:13 2025 +0800

    feat: Add Sink#count operator. (#2244)
    
    * feat: Add Sink#count operator.
    
    * Use .asInstanceOf for better performance
    
    ---------
    
    Co-authored-by: Matthew de Detrich <[email protected]>
---
 .../main/paradox/stream/operators/Sink/count.md    | 39 +++++++++++++
 docs/src/main/paradox/stream/operators/index.md    |  2 +
 .../jdocs/stream/operators/SinkDocExamples.java    |  9 +++
 .../org/apache/pekko/stream/javadsl/SinkTest.java  |  7 +++
 .../apache/pekko/stream/scaladsl/SinkSpec.scala    | 14 +++++
 .../org/apache/pekko/stream/impl/Stages.scala      |  1 +
 .../pekko/stream/impl/fusing/CountSink.scala       | 66 ++++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Sink.scala     | 21 +++++++
 .../org/apache/pekko/stream/scaladsl/Sink.scala    | 22 +++++++-
 9 files changed, 180 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Sink/count.md 
b/docs/src/main/paradox/stream/operators/Sink/count.md
new file mode 100644
index 0000000000..44c02300bd
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/count.md
@@ -0,0 +1,39 @@
+# Sink.count
+
+Counts all incoming elements until upstream terminates.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.count](Sink$) { 
scala="#count[T]:org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Long]]"
 java="#count()" }
+
+
+## Description
+
+Counts values emitted from the stream, the count is available through a 
@scala[`Future`] @java[`CompletionStage`] or
+which completes when the stream completes. 
+
+## Example
+
+Given a stream of numbers we can count the numbers with the `count` operator
+
+Scala
+:   @@snip 
[SinkSpec.scala](/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala)
 { #count-operator-example }
+
+Java
+:   @@snip 
[SinkDocExamples.java](/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java)
 { #count-operator-example }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**completes** when upstream completes
+
+**backpressures** never (counting is a lightweight operation)
+
+**cancels** never
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index c1cd3e21e0..aee2d94e1e 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -56,6 +56,7 @@ These built-in sinks are available from 
@scala[`org.apache.pekko.stream.scaladsl
 |Sink|<a 
name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all 
values emitted from the stream into a collection.]@java[Operator only available 
in the Scala API. The closest operator in the Java API is 
@ref[`Sink.seq`](Sink/seq.md)].|
 |Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several 
sinks into one using a user specified strategy|
 |Sink|<a 
name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams
 the elements to the given future sink once it successfully completes. |
+|Sink|<a name="count"></a>@ref[count](Sink/count.md)|Counts all incoming 
elements until upstream terminates.|
 |Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will 
test the given predicate `p` for every received element and completes with the 
result.|
 |Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements 
with a function, where each invocation will get the new element and the result 
from the previous fold invocation.|
 |Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over 
emitted elements with a function, where each invocation will get the new 
element and the result from the previous fold invocation.|
@@ -433,6 +434,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [conflate](Source-or-Flow/conflate.md)
 * [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
 * [contramap](Flow/contramap.md)
+* [count](Sink/count.md)
 * [cycle](Source/cycle.md)
 * [deflate](Compression/deflate.md)
 * [delay](Source-or-Flow/delay.md)
diff --git a/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java 
b/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java
index 30a6609b6b..bc41ab6909 100644
--- a/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java
+++ b/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java
@@ -52,6 +52,15 @@ public class SinkDocExamples {
     // #seq-operator-example
   }
 
+  static void countExample() {
+    // #count-operator-example
+    Source<Integer, NotUsed> ints = Source.range(1, 10);
+    CompletionStage<Long> count = ints.runWith(Sink.count(), system);
+    count.thenAccept(System.out::println);
+    // 10
+    // #count-operator-example
+  }
+
   static void takeLastExample() {
     // #takeLast-operator-example
     // pair of (Name, GPA)
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index f5c220841d..882cf0ccbf 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -34,6 +34,7 @@ import org.apache.pekko.stream.testkit.javadsl.TestSink;
 import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
 import org.apache.pekko.testkit.PekkoSpec;
 import org.apache.pekko.testkit.javadsl.TestKit;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.reactivestreams.Publisher;
@@ -262,4 +263,10 @@ public class SinkTest extends StreamTest {
     boolean anyMatch = cs.toCompletableFuture().get(100, 
TimeUnit.MILLISECONDS);
     assertTrue(anyMatch);
   }
+
+  @Test
+  public void sinkMustBeAbleToUseCount() {
+    CompletionStage<Long> cs = Source.range(1, 10).runWith(Sink.count(), 
system);
+    Assert.assertEquals(10, cs.toCompletableFuture().join().longValue());
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
index 38c62edd73..481c2a2cee 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala
@@ -324,6 +324,20 @@ class SinkSpec extends StreamSpec with DefaultTimeout with 
ScalaFutures {
     }
   }
 
+  "The count sink" must {
+    "count the number of elements in the stream" in {
+      // #count-operator-example
+      val source = Source(1 to 10)
+      val result = source.runWith(Sink.count)
+      val count = result.futureValue
+      println(count)
+      // will print
+      // 10
+      // #count-operator-example
+      assert(result.futureValue == 10)
+    }
+  }
+
   "The foreach sink" must {
     "illustrate println" in {
       // #foreach
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index cd01348ce3..9c7c1d56e1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -154,6 +154,7 @@ import pekko.stream.Attributes._
     val lastOptionSink = name("lastOptionSink")
     val takeLastSink = name("takeLastSink")
     val seqSink = name("seqSink")
+    val countSink = name("countSink")
     val publisherSink = name("publisherSink")
     val fanoutPublisherSink = name("fanoutPublisherSink")
     val ignoreSink = name("ignoreSink")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala
new file mode 100644
index 0000000000..6a665061d5
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import scala.concurrent.{ Future, Promise }
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.stream.{ AbruptStageTerminationException, Attributes, Inlet, 
SinkShape }
+import pekko.stream.impl.Stages.DefaultAttributes
+import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, 
InHandler }
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object CountSink
+    extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Long]] {
+  private val in = Inlet[Any]("seq.in")
+  override def shape: SinkShape[Any] = SinkShape.of(in)
+  override def toString: String = "CountSink"
+  override protected def initialAttributes: Attributes = 
DefaultAttributes.countSink
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[Long]) = {
+    val promise = Promise[Long]()
+    object logic extends GraphStageLogic(shape) with InHandler {
+      private var counter: Long = 0L
+      override def preStart(): Unit = pull(in)
+      override def onPush(): Unit = {
+        counter += 1
+        pull(in)
+      }
+      override def onUpstreamFinish(): Unit = {
+        promise.trySuccess(counter)
+        completeStage()
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = {
+        promise.tryFailure(ex)
+        failStage(ex)
+      }
+
+      override def postStop(): Unit = {
+        if (!promise.isCompleted) promise.failure(new 
AbruptStageTerminationException(this))
+      }
+
+      setHandler(in, this)
+    }
+
+    (logic, promise.future)
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 092ca7e35b..7894124d50 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -313,6 +313,27 @@ object Sink {
       scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => 
sq.asJava)(ExecutionContext.parasitic).asJava))
   }
 
+  /**
+   * A `Sink` that counts all incoming elements until upstream terminates.
+   *
+   * Since upstream may be unbounded, consider using `Flow[T].take` or the 
stricter `Flow[T].limit`
+   * (and their variants) to ensure boundedness. The sink materializes into a 
`CompletionStage` of `Long`
+   * containing the total count of elements that passed through.
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Backpressures when''' never (counting is a lightweight operation)
+   *
+   * '''Cancels when''' never
+   *
+   * @return a `Sink` that materializes to a `CompletionStage[Long]` with the 
element count
+   * @since 2.0.0
+   *
+   * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], 
[[Flow.takeWithin]], [[Flow.takeWhile]]
+   */
+  def count[In]: Sink[In, CompletionStage[java.lang.Long]] = new Sink(
+    
scaladsl.Sink.count[In].mapMaterializedValue(_.asJava.asInstanceOf[CompletionStage[java.lang.Long]]))
+
   /**
    * Sends the elements of the stream to the given `ActorRef`.
    * If the target actor terminates the stream will be canceled.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index a58ec833f0..f5dc532eed 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -27,7 +27,7 @@ import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.impl._
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages
+import pekko.stream.impl.fusing.{ CountSink, GraphStages }
 import pekko.stream.stage._
 
 import org.reactivestreams.{ Publisher, Subscriber }
@@ -261,6 +261,26 @@ object Sink {
    */
   def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new 
SeqStage[T, Vector[T]])
 
+  /**
+   * A `Sink` that counts all incoming elements until upstream terminates.
+   *
+   * Since upstream may be unbounded, consider using `Flow[T].take` or the 
stricter `Flow[T].limit`
+   * (and their variants) to ensure boundedness. The sink materializes into a 
`Future` of `Long`
+   * containing the total count of elements that passed through.
+   *
+   * '''Completes when''' upstream completes
+   *
+   * '''Backpressures when''' never (counting is a lightweight operation)
+   *
+   * '''Cancels when''' never
+   *
+   * @return a `Sink` that materializes to a `Future[Long]` with the element 
count
+   * @since 2.0.0
+   *
+   * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], 
[[Flow.takeWithin]], [[Flow.takeWhile]]
+   */
+  def count[T]: Sink[T, Future[Long]] = Sink.fromGraph(CountSink)
+
   /**
    * A `Sink` that keeps on collecting incoming elements until upstream 
terminates.
    * As upstream may be unbounded, `Flow[T].take` or the stricter 
`Flow[T].limit` (and their variants)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to