This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch fixRecover
in repository https://gitbox.apache.org/repos/asf/pekko.git

commit 57a9853941e516dc601832a1a6ec64d96e315526
Author: He-Pin <[email protected]>
AuthorDate: Thu Mar 5 01:42:42 2026 +0800

    Revert "Revert recent changes to recoverWith (#2674)"
    
    This reverts commit c9906c60fcfac696cf307ca0edc8a80da202dde1.
---
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  | 38 ++++++++++++++++++----
 1 file changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index 0635e52d51..d02744c8d3 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -36,9 +36,16 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
 import pekko.stream.Attributes.SourceLocation
 import pekko.stream.OverflowStrategies._
 import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, 
ReactiveStreamsCompliance, TraversalBuilder }
+import pekko.stream.impl.{
+  Buffer => BufferImpl,
+  ContextPropagation,
+  FailedSource,
+  JavaStreamSource,
+  ReactiveStreamsCompliance,
+  TraversalBuilder
+}
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
+import pekko.stream.impl.fusing.GraphStages.{ FutureSource, 
SimpleLinearGraphStage, SingleSource }
 import pekko.stream.scaladsl.{
   DelayStrategy,
   Source,
@@ -2162,6 +2169,7 @@ private[pekko] object TakeWithin {
       override def onPull(): Unit = pull(in)
 
       @nowarn("msg=Any")
+      @tailrec
       def onFailure(ex: Throwable): Unit = {
         import Collect.NotApplied
         if (maximumRetries < 0 || attempt < maximumRetries) {
@@ -2169,12 +2177,28 @@ private[pekko] object TakeWithin {
             case _: NotApplied.type                                            
                                   => failStage(ex)
             case source: Graph[SourceShape[T] @unchecked, M @unchecked] if 
TraversalBuilder.isEmptySource(source) =>
               completeStage()
-            case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
-              TraversalBuilder.getSingleSource(other) match {
-                case OptionVal.Some(singleSource) =>
-                  emit(out, singleSource.elem.asInstanceOf[T], () => 
completeStage())
+            case source: Graph[SourceShape[T] @unchecked, M @unchecked] =>
+              TraversalBuilder.getValuePresentedSource(source) match {
+                case OptionVal.Some(graph) => graph match {
+                    case singleSource: SingleSource[T @unchecked] => emit(out, 
singleSource.elem, () => completeStage())
+                    case failed: FailedSource[T @unchecked]       => 
onFailure(failed.failure)
+                    case futureSource: FutureSource[T @unchecked] => 
futureSource.future.value match {
+                        case Some(Success(elem)) => emit(out, elem, () => 
completeStage())
+                        case Some(Failure(ex))   => onFailure(ex)
+                        case None                =>
+                          switchTo(source)
+                          attempt += 1
+                      }
+                    case iterableSource: IterableSource[T @unchecked] =>
+                      emitMultiple(out, iterableSource.elements, () => 
completeStage())
+                    case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
+                      emitMultiple(out, javaStreamSource.open().spliterator(), 
() => completeStage())
+                    case _ =>
+                      switchTo(source)
+                      attempt += 1
+                  }
                 case _ =>
-                  switchTo(other)
+                  switchTo(source)
                   attempt += 1
               }
             case _ => throw new IllegalStateException() // won't happen, 
compiler exhaustiveness check pleaser


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to