This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new a579679445 feat: Add Sink#source (#2250)
a579679445 is described below

commit a579679445e9d47eafff7cfde62a0c2bc7268ee2
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Sep 23 17:10:26 2025 +0800

    feat: Add Sink#source (#2250)
    
    * feat: Add Sink#source
---
 .../main/paradox/stream/operators/Sink/source.md   |  29 +++++
 docs/src/main/paradox/stream/operators/index.md    |   2 +
 ...IterablePublisherViaJavaFlowPublisherTest.scala |   1 +
 .../org/apache/pekko/stream/javadsl/SinkTest.java  |  11 ++
 .../pekko/stream/scaladsl/SourceSinkSpec.scala     |  87 +++++++++++++++
 .../org/apache/pekko/stream/impl/Stages.scala      |   1 +
 .../pekko/stream/impl/fusing/SourceSink.scala      | 121 +++++++++++++++++++++
 .../org/apache/pekko/stream/javadsl/Sink.scala     |  14 +++
 .../org/apache/pekko/stream/scaladsl/Sink.scala    |  17 ++-
 9 files changed, 282 insertions(+), 1 deletion(-)

diff --git a/docs/src/main/paradox/stream/operators/Sink/source.md 
b/docs/src/main/paradox/stream/operators/Sink/source.md
new file mode 100644
index 0000000000..6dc6f751e9
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Sink/source.md
@@ -0,0 +1,29 @@
+# Sink.source
+
+A `Sink` that materializes this `Sink` itself as a `Source`, the returning 
`Source` can only have one subscriber.
+
+@ref[Sink operators](../index.md#sink-operators)
+
+## Signature
+
+@apidoc[Sink.source](Sink$) { java="#source()" }
+@apidoc[Sink.source](Sink$) { scala="#source()" }
+
+
+## Description
+
+A `Sink` that materialize this `Sink` itself as a `Source`, the returning 
`Source` can only have one subscriber.
+
+Use `BroadcastHub.sink` if you need a `Source` that allows multiple 
subscribers.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**cancels** When the materialized `Source` is cancelled or timeout with 
subscription.
+
+**backpressures** When the materialized `Source` backpressures or not ready to 
receive elements.
+
+@@@
+
+
diff --git a/docs/src/main/paradox/stream/operators/index.md 
b/docs/src/main/paradox/stream/operators/index.md
index aee2d94e1e..da367ebad1 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -81,6 +81,7 @@ These built-in sinks are available from 
@scala[`org.apache.pekko.stream.scaladsl
 |Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` 
that can be pulled to trigger demand through the sink.|
 |Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction 
function on the incoming elements and pass the result to the next invocation.|
 |Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the 
stream into a collection.|
+|Sink|<a name="source"></a>@ref[source](Sink/source.md)|A `Sink` that 
materializes this `Sink` itself as a `Source`, the returning `Source` can only 
have one subscriber.|
 |Sink|<a name="takelast"></a>@ref[takeLast](Sink/takeLast.md)|Collect the last 
`n` values emitted from the stream into a collection.|
 
 ## Additional Sink and Source converters
@@ -575,6 +576,7 @@ For more background see the @ref[Error Handling in 
Streams](../stream-error.md)
 * [single](Source/single.md)
 * [sink](PubSub/sink.md)
 * [sliding](Source-or-Flow/sliding.md)
+* [source](Sink/source.md)
 * [source](PubSub/source.md)
 * [splitAfter](Source-or-Flow/splitAfter.md)
 * [splitWhen](Source-or-Flow/splitWhen.md)
diff --git 
a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala
 
b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala
index 44706c2456..b81ca2c16b 100644
--- 
a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala
+++ 
b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala
@@ -18,6 +18,7 @@ import java.util.concurrent.{ Flow => JavaFlow }
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source }
+
 import org.reactivestreams._
 
 class IterablePublisherViaJavaFlowPublisherTest extends 
PekkoPublisherVerification[Int] {
diff --git 
a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java 
b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
index 882cf0ccbf..c912e734aa 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java
@@ -269,4 +269,15 @@ public class SinkTest extends StreamTest {
     CompletionStage<Long> cs = Source.range(1, 10).runWith(Sink.count(), 
system);
     Assert.assertEquals(10, cs.toCompletableFuture().join().longValue());
   }
+
+  @Test
+  public void mustBeAbleToUseSinkAsSource() throws Exception {
+    final List<Integer> r =
+        Source.range(1, 10)
+            .runWith(Sink.source(), system)
+            .runWith(Sink.seq(), system)
+            .toCompletableFuture()
+            .get(1, TimeUnit.SECONDS);
+    assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), r);
+  }
 }
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala
new file mode 100644
index 0000000000..73f728bbdb
--- /dev/null
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.scaladsl
+
+import org.apache.pekko
+import pekko.stream.{ Attributes, StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.testkit.StreamSpec
+import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
+
+class SourceSinkSpec extends StreamSpec("""
+    pekko.stream.materializer.initial-input-buffer-size = 2
+  """) {
+
+  "Sink.toSeq" must {
+    "Can be used as a Source with run twice" in {
+      val s = Source(1 to 6).runWith(Sink.source)
+      s.runWith(Sink.seq).futureValue should be(1 to 6)
+    }
+
+    "Can complete when upstream completes without elements" in {
+      val s = Source.empty.runWith(Sink.source)
+      s.runWith(Sink.seq).futureValue should be(Nil)
+    }
+
+    "Can cancel when down stream cancel" in {
+      val (pub, source) = TestSource.probe[Int]
+        .toMat(Sink.source)(Keep.both)
+        .run()
+      val sub = source.runWith(TestSink.probe[Int])
+      pub.ensureSubscription()
+      sub.ensureSubscription()
+      sub.cancel()
+      pub.expectCancellation()
+    }
+
+    "Can timeout when no subscription" in {
+      import scala.concurrent.duration._
+      val (pub, source) = TestSource.probe[Int]
+        .toMat(Sink.source)(Keep.both)
+        .addAttributes(Attributes(
+          StreamSubscriptionTimeout(
+            2.seconds,
+            StreamSubscriptionTimeoutTerminationMode.cancel
+          )
+        ))
+        .run()
+      pub.expectCancellation()
+      Thread.sleep(1000) // wait a bit
+      val sub = source.runWith(TestSink.probe)
+      sub.expectSubscription()
+      sub.expectError()
+    }
+
+    "Can backpressure" in {
+      Source.iterate(1)(_ => true, _ + 1)
+        .runWith(Sink.source).runWith(TestSink.probe[Int])
+        .request(3)
+        .expectNext(1, 2, 3)
+        .request(2)
+        .expectNext(4, 5)
+        .cancel()
+    }
+
+    "Can use with mapMaterializedValue" in {
+      val sink = Sink.source[Int].mapMaterializedValue(_.runWith(Sink.seq))
+      Source(1 to 5)
+        .runWith(sink)
+        .futureValue should be(1 to 5)
+    }
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
index 9c7c1d56e1..58ea8b5bd5 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala
@@ -156,6 +156,7 @@ import pekko.stream.Attributes._
     val seqSink = name("seqSink")
     val countSink = name("countSink")
     val publisherSink = name("publisherSink")
+    val sourceSink = name("sourceSink")
     val fanoutPublisherSink = name("fanoutPublisherSink")
     val ignoreSink = name("ignoreSink")
     val neverSink = name("neverSink")
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala
new file mode 100644
index 0000000000..db58cc93e6
--- /dev/null
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.impl.fusing
+
+import org.apache.pekko
+import org.apache.pekko.stream.impl.Stages.DefaultAttributes
+import pekko.NotUsed
+import pekko.annotation.InternalApi
+import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, 
StreamSubscriptionTimeoutTerminationMode }
+import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
+import pekko.stream.scaladsl.Source
+import pekko.stream.stage.{
+  GraphStageLogic,
+  GraphStageWithMaterializedValue,
+  InHandler,
+  OutHandler,
+  TimerGraphStageLogic
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[pekko] object SourceSink
+    extends GraphStageWithMaterializedValue[SinkShape[Any], Source[Any, 
NotUsed]] {
+  private val SubscriptionTimerKey = "SubstreamSubscriptionTimerKey"
+  private val in = Inlet[Any]("sourceSink.in")
+  override val shape = SinkShape(in)
+
+  override def toString: String = "SourceSink"
+  override protected def initialAttributes: Attributes = 
DefaultAttributes.sourceSink
+
+  override def createLogicAndMaterializedValue(
+      inheritedAttributes: Attributes): (GraphStageLogic, Source[Any, 
NotUsed]) = {
+
+    /**
+     * NOTE: in the current implementation of Pekko Stream,
+     * We have to materialization twice to do the piping, which means, even we 
can treat the Sink as a Source.
+     *
+     * In an idea word this stage should be purged out by the materializer 
optimization,
+     * and we can directly connect the upstream to the downstream.
+     */
+    object logic extends TimerGraphStageLogic(shape) with InHandler with 
OutHandler { self =>
+      val sinkSource = new SubSourceOutlet[Any]("sinkSource")
+
+      private def subHandler(): OutHandler = new OutHandler {
+        override def onPull(): Unit = {
+          setKeepGoing(false)
+          cancelTimer(SubscriptionTimerKey)
+          pull(in)
+          sinkSource.setHandler(self)
+        }
+        override def onDownstreamFinish(cause: Throwable): Unit = 
self.onDownstreamFinish(cause)
+      }
+
+      override def preStart(): Unit = {
+        sinkSource.setHandler(subHandler())
+        setKeepGoing(true)
+        val timeout = 
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
+        scheduleOnce(SubscriptionTimerKey, timeout)
+      }
+
+      override protected def onTimer(timerKey: Any): Unit = {
+        val materializer = interpreter.materializer
+        val StreamSubscriptionTimeout(timeout, mode) =
+          
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout]
+
+        mode match {
+          case StreamSubscriptionTimeoutTerminationMode.CancelTermination =>
+            sinkSource.timeout(timeout)
+            if (sinkSource.isClosed)
+              completeStage()
+          case StreamSubscriptionTimeoutTerminationMode.NoopTermination =>
+          // do nothing
+          case StreamSubscriptionTimeoutTerminationMode.WarnTermination =>
+            materializer.logger.warning(
+              "Substream subscription timeout triggered after {} in 
SourceSink.",
+              timeout)
+        }
+      }
+
+      override def onPush(): Unit = sinkSource.push(grab(in))
+      override def onPull(): Unit = pull(in)
+
+      override def onUpstreamFinish(): Unit = {
+        if (!sinkSource.isClosed) {
+          sinkSource.complete()
+        }
+        completeStage()
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = if 
(!sinkSource.isClosed) {
+        sinkSource.fail(ex)
+        completeStage()
+      } else failStage(ex)
+
+      override def onDownstreamFinish(cause: Throwable): Unit = {
+        // cancel upstream only if the substream was cancelled
+        if (!isClosed(in)) cancelStage(cause)
+      }
+
+      setHandler(in, this)
+    }
+
+    (logic, Source.fromGraph(logic.sinkSource.source))
+  }
+}
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
index 7894124d50..8cdfc87b13 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
@@ -212,6 +212,20 @@ object Sink {
   def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
     new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
 
+  /**
+   * A `Sink` that materializes this `Sink` itself as a `Source`.
+   * The returned `Source` is a "live view" onto the `Sink` and only supports 
a single `Subscriber`.
+   *
+   * Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple 
subscribers.
+   *
+   * Note: even if the `Source` is directly connected to the `Sink`, there is 
still an asynchronous boundary
+   * between them; performance may be improved in the future.
+   *
+   * @since 2.0.0
+   */
+  def source[T](): Sink[T, Source[T, NotUsed]] = new 
Sink(scaladsl.Sink.source[T])
+    .mapMaterializedValue(src => src.asJava)
+
   /**
    * A `Sink` that will invoke the given procedure for each received element. 
The sink is materialized
    * into a [[java.util.concurrent.CompletionStage]] which will be completed 
with `Success` when reaching the
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
index f5dc532eed..96d7fe622b 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
@@ -27,7 +27,7 @@ import pekko.annotation.InternalApi
 import pekko.stream._
 import pekko.stream.impl._
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.{ CountSink, GraphStages }
+import pekko.stream.impl.fusing.{ CountSink, GraphStages, SourceSink }
 import pekko.stream.stage._
 
 import org.reactivestreams.{ Publisher, Subscriber }
@@ -312,6 +312,21 @@ object Sink {
       if (fanout) new 
FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, 
shape("FanoutPublisherSink"))
       else new PublisherSink[T](DefaultAttributes.publisherSink, 
shape("PublisherSink")))
 
+  /**
+   * A `Sink` that materializes this `Sink` itself as a `Source`.
+   * The returned `Source` is a "live view" onto the `Sink` and only supports 
a single `Subscriber`.
+   *
+   * Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple 
subscribers.
+   *
+   * Note: even if the `Source` is directly connected to the `Sink`, there is 
still an asynchronous boundary
+   * between them; performance may be improved in the future.
+   *
+   * @since 2.0.0
+   */
+  def source[T]: Sink[T, Source[T, NotUsed]] = 
_sourceSink.asInstanceOf[Sink[T, Source[T, NotUsed]]]
+
+  private[this] val _sourceSink = fromGraph(SourceSink)
+
   /**
    * A `Sink` that will consume the stream and discard the elements.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to