Hi Stephen,

wasn’t your requirement to drop the whole sub-list when a failure occurs during 
its processing? That is not achieved with this design: you’ll have to buffer 
all events and only release the buffer when the substream completes 
successfully. Something like

  .fold(Vector.empty[Int])(_ :+ _)
  .recover { case _ => Vector.empty }
  .mapConcat(identity)

Regards,

Roland

> 9 juni 2017 kl. 00:13 skrev Stephen Duncan <stephen.dun...@gmail.com>:
> 
> So, the suggestion is something like this?: 
> 
>   val subStreamFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
>     .flatMapMerge(5, m => Source.single(m)
>       .mapConcat(identity)
>       .via(subFlow)
>       .map(Success(_))
>       .recover { case t: Throwable => Failure(t) }
>       .collect { case Success(x) => x }
>     )
>     .runWith(Sink.seq)
> 
> (Is there a nicer way to just drop, or log and drop, the failure if that's 
> what's desired? Would a feature-request for an equivalent to recover that 
> emits 0 elements be reasonable to provide an equivalent of resuming for 
> stream failure/teardown?)
> 
> Does using withAttributes to set the supervision strategy on the mapAsync 
> stage for that version where there is only one stream seem acceptable (i.e. 
> should be reliable)? I'm finding the distinction between where the exception 
> can be handled by supervision and where it has escaped and we change to 
> teardown propagation mode hard to understand. Other than 
> https://github.com/akka/akka/issues/22880, it seems that supervision-strategy 
> works for failures in a future for mapAsync. If it's not reliable, then I 
> must consider also doing map->recover using Try or Either, and then add 
> collect as the next stream stage.
> 
> 
> 
> On Monday, May 29, 2017 at 10:42:05 PM UTC-7, rkuhn wrote:
> The details here are probably too subtle to be useful as a system design 
> tool: there are two kinds of failure handling at play in this example—a 
> caught exception and propagation of teardown—and only one of them is 
> reasonably controlled by the supervisionStrategy. I would recommend using 
> `.recover()` to turn a failed stream into an explicit element instead of 
> relying upon supervision for non-local failure handling.
> 
> Regards,
> 
> Roland
> 
>> 29 maj 2017 kl. 15:45 skrev Martynas Mickevičius 
>> <martynas.m...@lightbend.com <javascript:>>:
>> 
>> From the initial look both of these example should behave the same. 
>> Especially since you explicitly set the supervision strategy. Could that 
>> there is a supervision strategy missing somewhere. What Akka version did you 
>> use? There has been quite a bit of changes how attributes are handled in 
>> Akka 2.5. I wonder if that is going to make a difference.
>> 
>> Otherwise I think this deserves an ticket on the issue tracker.
>> 
>> On Mon, May 29, 2017 at 11:39 AM Stephen Duncan <stephen...@gmail.com 
>> <javascript:>> wrote:
>> Before I assume and report this as a bug, I wanted to see if there was 
>> something I was missing regarding the error handling with Akka streams here. 
>> I am surprised to see two different outputs for the two example streams:
>> 
>> import akka.actor.ActorSystem
>> import akka.stream.scaladsl.{Flow, Sink, Source}
>> import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
>> 
>> import scala.collection.immutable.Seq
>> import scala.concurrent.duration._
>> import scala.concurrent.{Await, Future}
>> 
>> object MapMergeConcatError extends App {
>>   implicit val system = ActorSystem("Main")
>>   implicit val materializer = ActorMaterializer()
>>   implicit val ec = system.dispatcher
>> 
>>   val subFlow = {
>>     Flow[Int]
>>       .mapAsyncUnordered(5)(i => Future {
>>         if (i == 4) sys.error("☠")
>>         i * 5
>>       })
>>       
>> .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
>>       .reduce(_ + _)
>>   }
>> 
>>   val subStreamFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
>>     .flatMapMerge(5, m => Source.single(m).mapConcat(identity).via(subFlow))
>>     
>> .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
>>     .runWith(Sink.seq)
>> 
>>   val mapAsyncFuture = Source(Seq(Seq(1, 2), Seq(3, 4, 5), Seq(6)))
>>     .mapAsyncUnordered(5)(m => 
>> Source.single(m).mapConcat(identity).via(subFlow).runWith(Sink.head))
>>     
>> .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
>>     .runWith(Sink.seq)
>> 
>>   val f1 = Await.ready(subStreamFuture, 10.seconds)
>>   val f2 = Await.ready(mapAsyncFuture, 10.seconds)
>> 
>>   println(s"Using flatMapMerge: $f1")
>>   println(s"Using mapAsyncUnordered: $f2")
>> 
>>   system.terminate()
>> }
>> 
>> The output is:
>> Using flatMapMerge: Future(Failure(java.lang.RuntimeException: ☠))
>> Using mapAsyncUnordered: Future(Success(Vector(30, 15)))
>> The mapAsyncUnordered output is the desired output (the whole item from the 
>> top-level is dropped when there is a failure in the sub-flow, but the other 
>> items that did not have a failure in the sub-flow make it through).
>> 
>> Is this a bug, or something subtle that could be explained?
>> 
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
>> >>>>>>>>>> Check the FAQ: 
>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
>> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
>> >>>>>>>>>> <https://groups.google.com/group/akka-user>
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user 
>> <https://groups.google.com/group/akka-user>.
>> For more options, visit https://groups.google.com/d/optout 
>> <https://groups.google.com/d/optout>.
>> 
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
>> >>>>>>>>>> Check the FAQ: 
>> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
>> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
>> >>>>>>>>>> <https://groups.google.com/group/akka-user>
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user 
>> <https://groups.google.com/group/akka-user>.
>> For more options, visit https://groups.google.com/d/optout 
>> <https://groups.google.com/d/optout>.
> 
> 
> -- 
> >>>>>>>>>> Read the docs: http://akka.io/docs/ <http://akka.io/docs/>
> >>>>>>>>>> Check the FAQ: 
> >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html 
> >>>>>>>>>> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user 
> >>>>>>>>>> <https://groups.google.com/group/akka-user>
> --- 
> You received this message because you are subscribed to the Google Groups 
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to akka-user+unsubscr...@googlegroups.com 
> <mailto:akka-user+unsubscr...@googlegroups.com>.
> To post to this group, send email to akka-user@googlegroups.com 
> <mailto:akka-user@googlegroups.com>.
> Visit this group at https://groups.google.com/group/akka-user 
> <https://groups.google.com/group/akka-user>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to