This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new e0f6a88435 =str Switch the type parameter order of 
UnfoldResourceSource. (#615)
e0f6a88435 is described below

commit e0f6a88435199982d12e303d1d9889cd2333ec3d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Aug 2 17:30:06 2025 +0800

    =str Switch the type parameter order of UnfoldResourceSource. (#615)
---
 .../test/scala/org/apache/pekko/stream/FusingSpec.scala    |  6 +++---
 .../apache/pekko/stream/impl/UnfoldResourceSource.scala    | 10 +++++-----
 .../scala/org/apache/pekko/stream/javadsl/Source.scala     | 14 +++++++++-----
 .../scala/org/apache/pekko/stream/scaladsl/Source.scala    |  4 +++-
 4 files changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala 
b/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
index 59299c9ae2..06be646aa5 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala
@@ -103,7 +103,7 @@ class FusingSpec extends StreamSpec {
     }
 
     // an UnfoldResourceSource equivalent without an async boundary
-    case class UnfoldResourceNoAsyncBoundry[T, S](create: () => S, readData: 
(S) => Option[T], close: (S) => Unit)
+    case class UnfoldResourceNoAsyncBoundary[R, T](create: () => R, readData: 
(R) => Option[T], close: (R) => Unit)
         extends GraphStage[SourceShape[T]] {
       val stage_ = new UnfoldResourceSource(create, readData, close)
       override def initialAttributes: Attributes = Attributes.none
@@ -114,7 +114,7 @@ class FusingSpec extends StreamSpec {
 
     "propagate downstream errors through async boundary" in {
       val promise = Promise[Done]()
-      val slowInitSrc = UnfoldResourceNoAsyncBoundry(
+      val slowInitSrc = UnfoldResourceNoAsyncBoundary(
         () => { Await.result(promise.future, 1.minute); () },
         (_: Unit) => Some(1),
         (_: Unit) => ()).asSource.watchTermination()(Keep.right).async // 
commenting this out, makes the test pass
@@ -145,7 +145,7 @@ class FusingSpec extends StreamSpec {
 
     "propagate 'parallel' errors through async boundary via a common 
downstream" in {
       val promise = Promise[Done]()
-      val slowInitSrc = UnfoldResourceNoAsyncBoundry(
+      val slowInitSrc = UnfoldResourceNoAsyncBoundary(
         () => { Await.result(promise.future, 1.minute); () },
         (_: Unit) => Some(1),
         (_: Unit) => ()).asSource.watchTermination()(Keep.right).async // 
commenting this out, makes the test pass
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
index da0a6ee32d..00d48e471b 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala
@@ -27,10 +27,10 @@ import pekko.stream.stage._
 /**
  * INTERNAL API
  */
-@InternalApi private[pekko] final class UnfoldResourceSource[T, S](
-    create: () => S,
-    readData: (S) => Option[T],
-    close: (S) => Unit)
+@InternalApi private[pekko] final class UnfoldResourceSource[R, T](
+    create: () => R,
+    readData: (R) => Option[T],
+    close: (R) => Unit)
     extends GraphStage[SourceShape[T]] {
   val out = Outlet[T]("UnfoldResourceSource.out")
   override val shape = SourceShape(out)
@@ -41,7 +41,7 @@ import pekko.stream.stage._
     new GraphStageLogic(shape) with OutHandler {
       private lazy val decider = 
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
       private var open = false
-      private var resource: S = _
+      private var resource: R = _
 
       override def preStart(): Unit = {
         resource = create()
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
index 0e5025ce8c..33674833da 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
@@ -41,6 +41,7 @@ import pekko.util.JavaDurationConverters._
 import pekko.util.OptionConverters._
 import pekko.util.ccompat.JavaConverters._
 import pekko.util.ccompat._
+
 import org.reactivestreams.{ Publisher, Subscriber }
 
 /** Java API */
@@ -934,12 +935,15 @@ object Source {
    * @param read - function that reads data from opened resource. It is called 
each time backpressure signal
    *             is received. Stream calls close and completes when `read` 
returns an empty Optional.
    * @param close - function that closes resource
+   * @tparam T - the element type
+   * @tparam R - the resource type
    */
-  def unfoldResource[T, S](
-      create: function.Creator[S],
-      read: function.Function[S, Optional[T]],
-      close: function.Procedure[S]): javadsl.Source[T, NotUsed] =
-    new Source(scaladsl.Source.unfoldResource[T, S](create.create _, (s: S) => 
read.apply(s).toScala, close.apply))
+  def unfoldResource[T, R](
+      create: function.Creator[R],
+      read: function.Function[R, Optional[T]],
+      close: function.Procedure[R]): javadsl.Source[T, NotUsed] =
+    new Source(scaladsl.Source.unfoldResource[T, R](create.create _, 
(resource: R) => read.apply(resource).toScala,
+      close.apply))
 
   /**
    * Start a new `Source` from some resource which can be opened, read and 
closed.
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
index aab2ce3f78..e8b185fa39 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala
@@ -1075,8 +1075,10 @@ object Source {
    * @param read - function that reads data from opened resource. It is called 
each time backpressure signal
    *             is received. Stream calls close and completes when `read` 
returns None.
    * @param close - function that closes resource
+   * @tparam T - the element type
+   * @tparam R - the resource type.
    */
-  def unfoldResource[T, S](create: () => S, read: (S) => Option[T], close: (S) 
=> Unit): Source[T, NotUsed] =
+  def unfoldResource[T, R](create: () => R, read: (R) => Option[T], close: (R) 
=> Unit): Source[T, NotUsed] =
     Source.fromGraph(new UnfoldResourceSource(create, read, close))
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to