This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new c9906c60fc Revert recent changes to recoverWith (#2674)
c9906c60fc is described below

commit c9906c60fcfac696cf307ca0edc8a80da202dde1
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Feb 23 20:04:48 2026 +0100

    Revert recent changes to recoverWith (#2674)
    
    * revert recent changes to recoverWith
    
    Update Ops.scala
    
    revert recent changes to recoverWith
    
    * scalafmt
---
 .../org/apache/pekko/stream/impl/fusing/Ops.scala  | 38 ++++------------------
 1 file changed, 7 insertions(+), 31 deletions(-)

diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
index d02744c8d3..0635e52d51 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
@@ -36,16 +36,9 @@ import pekko.stream.Attributes.{ InputBuffer, LogLevels }
 import pekko.stream.Attributes.SourceLocation
 import pekko.stream.OverflowStrategies._
 import pekko.stream.Supervision.Decider
-import pekko.stream.impl.{
-  Buffer => BufferImpl,
-  ContextPropagation,
-  FailedSource,
-  JavaStreamSource,
-  ReactiveStreamsCompliance,
-  TraversalBuilder
-}
+import pekko.stream.impl.{ Buffer => BufferImpl, ContextPropagation, 
ReactiveStreamsCompliance, TraversalBuilder }
 import pekko.stream.impl.Stages.DefaultAttributes
-import pekko.stream.impl.fusing.GraphStages.{ FutureSource, 
SimpleLinearGraphStage, SingleSource }
+import pekko.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
 import pekko.stream.scaladsl.{
   DelayStrategy,
   Source,
@@ -2169,7 +2162,6 @@ private[pekko] object TakeWithin {
       override def onPull(): Unit = pull(in)
 
       @nowarn("msg=Any")
-      @tailrec
       def onFailure(ex: Throwable): Unit = {
         import Collect.NotApplied
         if (maximumRetries < 0 || attempt < maximumRetries) {
@@ -2177,28 +2169,12 @@ private[pekko] object TakeWithin {
             case _: NotApplied.type                                            
                                   => failStage(ex)
             case source: Graph[SourceShape[T] @unchecked, M @unchecked] if 
TraversalBuilder.isEmptySource(source) =>
               completeStage()
-            case source: Graph[SourceShape[T] @unchecked, M @unchecked] =>
-              TraversalBuilder.getValuePresentedSource(source) match {
-                case OptionVal.Some(graph) => graph match {
-                    case singleSource: SingleSource[T @unchecked] => emit(out, 
singleSource.elem, () => completeStage())
-                    case failed: FailedSource[T @unchecked]       => 
onFailure(failed.failure)
-                    case futureSource: FutureSource[T @unchecked] => 
futureSource.future.value match {
-                        case Some(Success(elem)) => emit(out, elem, () => 
completeStage())
-                        case Some(Failure(ex))   => onFailure(ex)
-                        case None                =>
-                          switchTo(source)
-                          attempt += 1
-                      }
-                    case iterableSource: IterableSource[T @unchecked] =>
-                      emitMultiple(out, iterableSource.elements, () => 
completeStage())
-                    case javaStreamSource: JavaStreamSource[T @unchecked, _] =>
-                      emitMultiple(out, javaStreamSource.open().spliterator(), 
() => completeStage())
-                    case _ =>
-                      switchTo(source)
-                      attempt += 1
-                  }
+            case other: Graph[SourceShape[T] @unchecked, M @unchecked] =>
+              TraversalBuilder.getSingleSource(other) match {
+                case OptionVal.Some(singleSource) =>
+                  emit(out, singleSource.elem.asInstanceOf[T], () => 
completeStage())
                 case _ =>
-                  switchTo(source)
+                  switchTo(other)
                   attempt += 1
               }
             case _ => throw new IllegalStateException() // won't happen, 
compiler exhaustiveness check pleaser


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to