This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new b13f2b193b feat: Add fromOption operator (#2413) (#2415)
b13f2b193b is described below
commit b13f2b193bb9b7cb4990f397d3d2206f55e634a2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun Nov 2 16:05:04 2025 +0800
feat: Add fromOption operator (#2413) (#2415)
(cherry picked from commit c37cbc61920e3060dfbbfa797225b753893b65cb)
---
.../paradox/stream/operators/Source/fromOption.md | 26 ++++++++++++++++++++++
docs/src/main/paradox/stream/operators/index.md | 6 ++++-
.../apache/pekko/stream/javadsl/SourceTest.java | 17 ++++++++++++++
.../pekko/stream/DslFactoriesConsistencySpec.scala | 1 +
.../apache/pekko/stream/scaladsl/SourceSpec.scala | 17 ++++++++++++++
.../org/apache/pekko/stream/javadsl/Source.scala | 8 +++++++
.../org/apache/pekko/stream/scaladsl/Source.scala | 10 +++++++++
7 files changed, 84 insertions(+), 1 deletion(-)
diff --git a/docs/src/main/paradox/stream/operators/Source/fromOption.md
b/docs/src/main/paradox/stream/operators/Source/fromOption.md
new file mode 100644
index 0000000000..770fb18cc0
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Source/fromOption.md
@@ -0,0 +1,26 @@
+# Source.fromOption
+
+Create a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value,
emitting the value if it is present.
+
+@ref[Source operators](../index.md#source-operators)
+
+
+## Signature
+
+@apidoc[Source.fromOption](Source$) { }
+
+
+## Description
+
+Create a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value,
emitting the value if it is present.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the value is present
+
+**completes** afterwards
+
+@@@
+
diff --git a/docs/src/main/paradox/stream/operators/index.md
b/docs/src/main/paradox/stream/operators/index.md
index 73db178be4..0436088b04 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -22,6 +22,7 @@ These built-in sources are available from
@scala[`org.apache.pekko.stream.scalad
|Source|<a
name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated
by @ref[`Source.futureSource`](Source/futureSource.md).|
|Source|<a
name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the
values from an `Iterator`, requesting the next value when there is demand.|
|Source|<a
name="fromjavastream"></a>@ref[fromJavaStream](Source/fromJavaStream.md)|Stream
the values from a Java 8 `Stream`, requesting the next value when there is
demand.|
+|Source|<a name="fromoption"></a>@ref[fromOption](Source/fromOption.md)|Create
a `Source` from an @scala[`Option[T]`] @java[`Optional<T>`] value, emitting the
value if it is present.|
|Source|<a
name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration
with Reactive Streams, subscribes to a
@javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
|Source|<a
name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated
by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single
value of the `Future` when it completes and there is demand.|
@@ -377,6 +378,7 @@ Flow operators to (de)compress.
|Compression|<a
name="deflate"></a>@ref[deflate](Compression/deflate.md)|Creates a flow that
deflate-compresses a stream of ByteStrings. |
|Compression|<a name="gunzip"></a>@ref[gunzip](Compression/gunzip.md)|Creates
a flow that gzip-decompresses a stream of ByteStrings. |
|Compression|<a name="gzip"></a>@ref[gzip](Compression/gzip.md)|Creates a flow
that gzip-compresses a stream of ByteStrings. |
+|Compression|<a
name="gzipdecompress"></a>@ref[gzipDecompress](Compression/gzipDecompress.md)|Creates
a flow that gzip-decompresses a stream of ByteStrings. |
|Compression|<a
name="inflate"></a>@ref[inflate](Compression/inflate.md)|Creates a flow that
deflate-decompresses a stream of ByteStrings. |
## Error handling
@@ -498,6 +500,7 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [fromJavaStream](StreamConverters/fromJavaStream.md)
* [fromMaterializer](Source-or-Flow/fromMaterializer.md)
* [fromMaterializer](Sink/fromMaterializer.md)
+* [fromOption](Source/fromOption.md)
* [fromOutputStream](StreamConverters/fromOutputStream.md)
* [fromPath](FileIO/fromPath.md)
* [fromPublisher](Source/fromPublisher.md)
@@ -516,8 +519,9 @@ For more background see the @ref[Error Handling in
Streams](../stream-error.md)
* [groupedWeighted](Source-or-Flow/groupedWeighted.md)
* [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
* [groupedWithin](Source-or-Flow/groupedWithin.md)
-* [gzipDecompress](Compression/gzipDecompress.md)
+* [gunzip](Compression/gunzip.md)
* [gzip](Compression/gzip.md)
+* [gzipDecompress](Compression/gzipDecompress.md)
* [head](Sink/head.md)
* [headOption](Sink/headOption.md)
* [idleTimeout](Source-or-Flow/idleTimeout.md)
diff --git
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
index 081f657820..affa189036 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java
@@ -1870,4 +1870,21 @@ public class SourceTest extends StreamTest {
.expectNext(0)
.expectComplete();
}
+
+ @Test
+ public void mustBeAbleToCreateFromOption() throws Exception {
+ final Integer value =
+ Source.fromOption(Optional.of(42))
+ .runWith(Sink.head(), system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+ assertEquals((Integer) 42, value);
+ //
+ final Optional<Integer> empty =
+ Source.fromOption(Optional.<Integer>empty())
+ .runWith(Sink.headOption(), system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+ assertEquals(Optional.empty(), empty);
+ }
}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
index 0d3de6261f..93defd9017 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala
@@ -64,6 +64,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with
Matchers {
(classOf[scala.collection.Seq[_]],
classOf[java.util.List[_]]) ::
(classOf[scala.collection.immutable.Seq[_]],
classOf[java.util.List[_]]) ::
(classOf[scala.collection.immutable.Set[_]],
classOf[java.util.Set[_]]) ::
+ (classOf[scala.Option[_]],
classOf[java.util.Optional[_]]) ::
(classOf[Boolean],
classOf[pekko.stream.javadsl.AsPublisher]) ::
(classOf[scala.Function0[_]],
classOf[pekko.japi.function.Creator[_]]) ::
(classOf[scala.Function0[_]],
classOf[java.util.concurrent.Callable[_]]) ::
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
index cb37d951e6..f81bc8f3e5 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala
@@ -561,4 +561,21 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
.expectComplete()
}
}
+
+ "Source from option" must {
+ "produce one element when Some" in {
+ Source.fromOption(Some(42))
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectNext(42)
+ .expectComplete()
+ }
+
+ "complete immediately when None" in {
+ Source.fromOption(None)
+ .runWith(TestSink[Int]())
+ .request(1)
+ .expectComplete()
+ }
+ }
}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index d2484f89a0..c860816154 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -203,6 +203,14 @@ object Source {
def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new
Source(scaladsl.Source.fromGraph(
new ArraySource[T](array)))
+ /**
+ * Create a `Source` from an `Optional` value, emitting the value if it is
present.
+ *
+ * @since 1.3.0
+ */
+ def fromOption[T](optional: Optional[T]): Source[T, NotUsed] =
+ if (optional.isPresent) single(optional.get()) else empty()
+
/**
* Creates [[Source]] that represents integer values in range
''[start;end]'', step equals to 1.
* It allows to create `Source` out of range as simply as on Scala `Source(1
to N)`
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index e7f821cb87..306dde3ec3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -480,6 +480,16 @@ object Source {
def single[T](element: T): Source[T, NotUsed] =
fromGraph(new GraphStages.SingleSource(element))
+ /**
+ * Create a `Source` from an `Option` value, emitting the value if it is
defined.
+ *
+ * @since 1.3.0
+ */
+ def fromOption[T](option: Option[T]): Source[T, NotUsed] = option match {
+ case Some(value) => single(value)
+ case None => empty
+ }
+
/**
* Create a `Source` that will continually emit the given element.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]