This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new a08b52132f feat: Add mapOption operator (#2414)
a08b52132f is described below
commit a08b52132f4fec119a484fb197a36c1e31d3c291
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Nov 2 16:05:22 2025 +0800
feat: Add mapOption operator (#2414)
---
.../stream/operators/Source-or-Flow/mapOption.md | 31 ++++++++++++++++++++++
docs/src/main/paradox/stream/operators/index.md | 2 ++
.../scala/docs/stream/operators/MapOption.scala | 29 ++++++++++++++++++++
.../apache/pekko/stream/javadsl/SourceTest.java | 11 ++++++++
.../apache/pekko/stream/scaladsl/FlowSpec.scala | 9 +++++++
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 14 ++++++++++
.../org/apache/pekko/stream/impl/Stages.scala | 1 +
.../org/apache/pekko/stream/javadsl/Flow.scala | 22 +++++++++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 22 +++++++++++++++
.../org/apache/pekko/stream/javadsl/SubFlow.scala | 22 +++++++++++++++
.../apache/pekko/stream/javadsl/SubSource.scala | 22 +++++++++++++++
.../org/apache/pekko/stream/scaladsl/Flow.scala | 20 ++++++++++++++
12 files changed, 205 insertions(+)
diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md
b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md
new file mode 100644
index 0000000000..23a5dd1838
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapOption.md
@@ -0,0 +1,31 @@
+# mapOption
+
+Transform each element in the stream by calling a mapping function with it and
emits the contained item if present.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Source.mapOption](Source) {
scala="#mapOption[T](f:Out=>scala.Option[T]):FlowOps.this.Repr[T]"
java="#mapOption(org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.mapOption](Flow) {
scala="#mapOption[T](f:Out=>scala.Option[T]):FlowOps.this.Repr[T]"
java="#mapOption(org.apache.pekko.japi.function.Function)" }
+
+## Description
+
+Transform each element in the stream by calling a mapping function with it and
emits the contained item if present.
+
+## Examples
+
+Scala
+: @@snip
[Flow.scala](/docs/src/test/scala/docs/stream/operators/MapOption.scala) {
#imports #mapOption }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the mapping function returns and element present
+
+**backpressures** when downstream backpressures
+
+**completes** when upstream completes
+
+@@@
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index ad676d0551..a728b8b8ae 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -176,6 +176,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a
name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log
elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each
element in the stream by calling a mapping function with it and passing the
returned value downstream.|
|Source/Flow|<a
name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform
each element into zero or more elements that are individually passed
downstream.|
+|Source/Flow|<a
name="mapoption"></a>@ref[mapOption](Source-or-Flow/mapOption.md)|Transform
each element in the stream by calling a mapping function with it and emits the
contained item if present.|
|Source/Flow|<a
name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map
elements with the help of a resource that can be opened, transform each
element (in a blocking way) and closed.|
|Source/Flow|<a
name="materializeintosource"></a>@ref[materializeIntoSource](Source-or-Flow/materializeIntoSource.md)|Materializes
this Graph, immediately returning its materialized values into a new Source.|
|Source/Flow|<a
name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a
stream containing optional elements, transforms each element by applying the
given `viaFlow` and passing the value downstream as an optional value.|
@@ -540,6 +541,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
* [mapConcat](Source-or-Flow/mapConcat.md)
* [mapError](Source-or-Flow/mapError.md)
+* [mapOption](Source-or-Flow/mapOption.md)
* [mapWithResource](Source-or-Flow/mapWithResource.md)
* [materializeIntoSource](Source-or-Flow/materializeIntoSource.md)
* [maybe](Source/maybe.md)
diff --git a/docs/src/test/scala/docs/stream/operators/MapOption.scala
b/docs/src/test/scala/docs/stream/operators/MapOption.scala
new file mode 100644
index 0000000000..8cbbd5ca3f
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/MapOption.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.stream.operators
+
+//#imports
+import org.apache.pekko
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl._
+
+//#imports
+
+object MapOption {
+
+ // #mapOption
+ val source: Source[Int, NotUsed] = Source(1 to 10)
+ val mapped: Source[String, NotUsed] = source.mapOption(elem => if (elem % 2
== 0) Some(elem.toString) else None)
+ // #mapOption
+}
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 2787a4f22e..27db166b6c 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1888,4 +1888,15 @@ public class SourceTest extends StreamTest {
.get(3, TimeUnit.SECONDS);
assertEquals(Optional.empty(), empty);
}
+
+ @Test
+ public void mustBeAbleToMapOption() throws Exception {
+ final List<Integer> values =
+ Source.from(Arrays.asList(1, 2, 3, 4, 5))
+ .mapOption(i -> i % 2 == 0 ? Optional.of(i * 10) :
Optional.empty())
+ .runWith(Sink.seq(), system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+ assertEquals(Arrays.asList(20, 40), values);
+ }
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 88ad5c1218..85b9522164 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -645,6 +645,15 @@ class FlowSpec extends
StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
source.runWith(Sink.head).futureValue should ===(List(2, 4, 6))
}
+
+ "mapOption" in {
+ val flow = Flow[Int].mapOption {
+ case x if x % 2 == 0 => Some(x * 2)
+ case _ => None
+ }
+ val result = Source(1 to 5).via(flow).runWith(Sink.seq).futureValue
+ result should ===(Seq(4, 8))
+ }
}
object TestException extends RuntimeException with NoStackTrace
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index f81bc8f3e5..5c3a8f88ee 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -578,4 +578,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
.expectComplete()
}
}
+
+ "Source mapOption" must {
+ "map and filter elements" in {
+ Source(1 to 5)
+ .mapOption { n =>
+ if (n % 2 == 0) Some(n * 10)
+ else None
+ }
+ .runWith(TestSink[Int]())
+ .request(5)
+ .expectNext(20, 40)
+ .expectComplete()
+ }
+ }
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 5e0a97b1f7..2d4bd30c98 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -30,6 +30,7 @@ import pekko.stream.Attributes._
// stage specific default attributes
val map = name("map")
+ val mapOption = name("mapOption")
val contramap = name("contramap")
val dimap = name("dimap")
val log = name("log")
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 35619f491f..1c985575d3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -38,6 +38,7 @@ import pekko.japi.Pair
import pekko.japi.function
import pekko.japi.function.Creator
import pekko.stream.{ javadsl, _ }
+import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun
import pekko.util.Timeout
@@ -678,6 +679,27 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In,
Out, Mat]) extends Gr
def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.map(f.apply))
+ /**
+ * Transform each input element into an `Optional` of output element.
+ * If the mapping function returns `Optional.empty()`, the element is
filtered out.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the mapping function returns `Optional`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Flow[In,
T, Mat] =
+ new Flow(delegate.map(f(_)).collect {
+ case e if e.isPresent => e.get()
+ }.addAttributes(DefaultAttributes.mapOption))
+
/**
* This is a simplified version of `wireTap(Sink)` that takes only a simple
procedure.
* Elements will be passed into this "side channel" function, and any of its
results will be ignored.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 1f5468961f..55128fd3a1 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -37,6 +37,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair }
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava
}
+import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ ArraySource, StatefulMapConcat,
ZipWithIndexJava }
import pekko.util._
@@ -2077,6 +2078,27 @@ final class Source[Out, Mat](delegate:
scaladsl.Source[Out, Mat]) extends Graph[
def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.map(f.apply))
+ /**
+ * Transform each input element into an `Optional` of output element.
+ * If the mapping function returns `Optional.empty()`, the element is
filtered out.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the mapping function returns `Optional`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def mapOption[T](f: function.Function[Out, Optional[T]]): javadsl.Source[T,
Mat] =
+ new Source(delegate.map(f(_)).collect {
+ case e if e.isPresent => e.get()
+ }.addAttributes(DefaultAttributes.mapOption))
+
/**
* This is a simplified version of `wireTap(Sink)` that takes only a simple
procedure.
* Elements will be passed into this "side channel" function, and any of its
results will be ignored.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
index f5660192a7..0525aa9cb7 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala
@@ -31,6 +31,7 @@ import pekko.NotUsed
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair }
import pekko.stream._
+import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun
@@ -153,6 +154,27 @@ final class SubFlow[In, Out, Mat](
def map[T](f: function.Function[Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.map(f.apply))
+ /**
+ * Transform each input element into an `Optional` of output element.
+ * If the mapping function returns `Optional.empty()`, the element is
filtered out.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the mapping function returns `Optional`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def mapOption[T](f: function.Function[Out, Optional[T]]): SubFlow[In, T,
Mat] =
+ new SubFlow(delegate.map(f(_)).collect {
+ case e if e.isPresent => e.get()
+ }.addAttributes(DefaultAttributes.mapOption))
+
/**
* This is a simplified version of `wireTap(Sink)` that takes only a simple
procedure.
* Elements will be passed into this "side channel" function, and any of its
results will be ignored.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
index e7806c1f23..d93246a5a3 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala
@@ -31,6 +31,7 @@ import pekko.NotUsed
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, Pair }
import pekko.stream._
+import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.impl.fusing.{ StatefulMapConcat, ZipWithIndexJava }
import pekko.util.ConstantFun
@@ -144,6 +145,27 @@ final class SubSource[Out, Mat](
def map[T](f: function.Function[Out, T]): SubSource[T, Mat] =
new SubSource(delegate.map(f.apply))
+ /**
+ * Transform each input element into an `Optional` of output element.
+ * If the mapping function returns `Optional.empty()`, the element is
filtered out.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the mapping function returns `Optional`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def mapOption[T](f: function.Function[Out, Optional[T]]): SubSource[T, Mat] =
+ new SubSource(delegate.map(f(_)).collect {
+ case e if e.isPresent => e.get()
+ }.addAttributes(DefaultAttributes.mapOption))
+
/**
* This is a simplified version of `wireTap(Sink)` that takes only a simple
procedure.
* Elements will be passed into this "side channel" function, and any of its
results will be ignored.
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e67cc1fd67..00c7cec924 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -1087,6 +1087,26 @@ trait FlowOps[+Out, +Mat] {
*/
def map[T](f: Out => T): Repr[T] = via(Map(f))
+ /**
+ * Transform each input element into an `Option` of output element.
+ * If the function returns `Some(value)`, that value is emitted downstream.
+ * If the function returns `None`, no element is emitted downstream.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the mapping function returns `Some(value)`
+ *
+ * '''Backpressures when''' downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @since 1.3.0
+ */
+ def mapOption[T](f: Out => Option[T]): Repr[T] =
+ map(f).collect { case Some(value) => value
}.addAttributes(DefaultAttributes.mapOption)
+
/**
* This is a simplified version of `wireTap(Sink)` that takes only a simple
function.
* Elements will be passed into this "side channel" function, and any of its
results will be ignored.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]