This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new e0f6a88435 =str Switch the type parameter order of
UnfoldResourceSource. (#615)
e0f6a88435 is described below
commit e0f6a88435199982d12e303d1d9889cd2333ec3d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Aug 2 17:30:06 2025 +0800
=str Switch the type parameter order of UnfoldResourceSource. (#615)
---
.../test/scala/org/apache/pekko/stream/FusingSpec.scala | 6 +++---
.../apache/pekko/stream/impl/UnfoldResourceSource.scala | 10 +++++-----
.../scala/org/apache/pekko/stream/javadsl/Source.scala | 14 +++++++++-----
.../scala/org/apache/pekko/stream/scaladsl/Source.scala | 4 +++-
4 files changed, 20 insertions(+), 14 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
index 59299c9ae2..06be646aa5 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
@@ -103,7 +103,7 @@ class FusingSpec extends StreamSpec {
}
// an UnfoldResourceSource equivalent without an async boundary
- case class UnfoldResourceNoAsyncBoundry[T, S](create: () => S, readData:
(S) => Option[T], close: (S) => Unit)
+ case class UnfoldResourceNoAsyncBoundary[R, T](create: () => R, readData:
(R) => Option[T], close: (R) => Unit)
extends GraphStage[SourceShape[T]] {
val stage_ = new UnfoldResourceSource(create, readData, close)
override def initialAttributes: Attributes = Attributes.none
@@ -114,7 +114,7 @@ class FusingSpec extends StreamSpec {
"propagate downstream errors through async boundary" in {
val promise = Promise[Done]()
- val slowInitSrc = UnfoldResourceNoAsyncBoundry(
+ val slowInitSrc = UnfoldResourceNoAsyncBoundary(
() => { Await.result(promise.future, 1.minute); () },
(_: Unit) => Some(1),
(_: Unit) => ()).asSource.watchTermination()(Keep.right).async //
commenting this out, makes the test pass
@@ -145,7 +145,7 @@ class FusingSpec extends StreamSpec {
"propagate 'parallel' errors through async boundary via a common
downstream" in {
val promise = Promise[Done]()
- val slowInitSrc = UnfoldResourceNoAsyncBoundry(
+ val slowInitSrc = UnfoldResourceNoAsyncBoundary(
() => { Await.result(promise.future, 1.minute); () },
(_: Unit) => Some(1),
(_: Unit) => ()).asSource.watchTermination()(Keep.right).async //
commenting this out, makes the test pass
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
index da0a6ee32d..00d48e471b 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
@@ -27,10 +27,10 @@ import pekko.stream.stage._
/**
* INTERNAL API
*/
-@InternalApi private[pekko] final class UnfoldResourceSource[T, S](
- create: () => S,
- readData: (S) => Option[T],
- close: (S) => Unit)
+@InternalApi private[pekko] final class UnfoldResourceSource[R, T](
+ create: () => R,
+ readData: (R) => Option[T],
+ close: (R) => Unit)
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSource.out")
override val shape = SourceShape(out)
@@ -41,7 +41,7 @@ import pekko.stream.stage._
new GraphStageLogic(shape) with OutHandler {
private lazy val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var open = false
- private var resource: S = _
+ private var resource: R = _
override def preStart(): Unit = {
resource = create()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 0e5025ce8c..33674833da 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -41,6 +41,7 @@ import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._
import pekko.util.ccompat._
+
import org.reactivestreams.{ Publisher, Subscriber }
/** Java API */
@@ -934,12 +935,15 @@ object Source {
* @param read - function that reads data from opened resource. It is called
each time backpressure signal
* is received. Stream calls close and completes when `read`
returns an empty Optional.
* @param close - function that closes resource
+ * @tparam T - the element type
+ * @tparam R - the resource type
*/
- def unfoldResource[T, S](
- create: function.Creator[S],
- read: function.Function[S, Optional[T]],
- close: function.Procedure[S]): javadsl.Source[T, NotUsed] =
- new Source(scaladsl.Source.unfoldResource[T, S](create.create _, (s: S) =>
read.apply(s).toScala, close.apply))
+ def unfoldResource[T, R](
+ create: function.Creator[R],
+ read: function.Function[R, Optional[T]],
+ close: function.Procedure[R]): javadsl.Source[T, NotUsed] =
+ new Source(scaladsl.Source.unfoldResource[T, R](create.create _,
(resource: R) => read.apply(resource).toScala,
+ close.apply))
/**
* Start a new `Source` from some resource which can be opened, read and
closed.
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index aab2ce3f78..e8b185fa39 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -1075,8 +1075,10 @@ object Source {
* @param read - function that reads data from opened resource. It is called
each time backpressure signal
* is received. Stream calls close and completes when `read`
returns None.
* @param close - function that closes resource
+ * @tparam T - the element type
+ * @tparam R - the resource type.
*/
- def unfoldResource[T, S](create: () => S, read: (S) => Option[T], close: (S)
=> Unit): Source[T, NotUsed] =
+ def unfoldResource[T, R](create: () => R, read: (R) => Option[T], close: (R)
=> Unit): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSource(create, read, close))
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]