Have you considered wrapping your erroneous request in a Try ?

  val (queue: SourceQueueWithComplete[String], source: Source[String, NotUsed]) 
        .via(Flow[String].map { num =>
          println(s"flow handling ${num}")
          if (num == "1") {
            Failure(new Exception())
          } else {

        .map {
          case Success(v) => v
          case Failure(ex) => "error"

        .toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()

On Saturday, October 21, 2017 at 7:25:27 AM UTC-6, sub...@gmail.com wrote:
> Hi,
> I'm using a Source.queue with BroadcastHub to implement a pattern where a 
> web request can add an item to the queue, attach to the graph and get a 
> result. The problem I'm stuck on is that I'm not sure how to handle errors 
> without failing the graph. Example:
> Without the supervision strategy I see getting the following output
> flow handling 1
> Result 1 Success(Some(error))
> Result 2 Success(None)
> The graph stops after the exception and "2" is never processed
> With the supervision, the graph recovers and processes "2" but never sees 
> "error"
> flow handling 1
> Graph failed java.lang.Exception.. resuming
> Result 1 Failure(java.util.concurrent.TimeoutException: No elements passed 
> in the last 1 second.)
> flow handling 2
> Result 2 Success(Some(2))
> What I'd like to see is 
> Result 1 Success(Some(error))
> Result 2 Success(Some(2))
> Is there a way I can recover the graph on a failure but also see the value 
> returned from the "recover" function. Also, why does the graph still fail 
> the graph when there is a recover? Thanks
> import akka.NotUsed
> import akka.actor.ActorSystem
> import akka.stream._
> import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, 
> SourceQueueWithComplete}
> import scala.concurrent.Await
> import scala.util.Try
> object QueueGraph extends App {
>   implicit val system = ActorSystem()
>   implicit val materializer = ActorMaterializer()
>   val sourceQueue: Source[String, SourceQueueWithComplete[String]] =
>     Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
>   val decider: Supervision.Decider = {
>     case e: Exception =>
>       println(s"Graph failed ${e}.. resuming")
>       Supervision.Resume
>   }
>   val (queue: SourceQueueWithComplete[String], source: Source[String, 
> NotUsed]) =
>     sourceQueue
>         .via(Flow[String].map { num =>
>           println(s"flow handling ${num}")
>           if (num == "1") {
>             throw new Exception()
>           } else {
>             num
>           }
>         })
>         .recover{case e: Exception => "error"}
> // If supervisor is removed, the recover catches the exception and returns 
> "error" but the graph fails and no longer processes anything. With supervisor 
> "error" is never emitted
> //        .withAttributes(ActorAttributes.supervisionStrategy(decider))
>         .toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both).run()
>   import scala.concurrent.duration._
>   // find error
>   val f = source
>     .filter(_ == "error")
>       .idleTimeout(1.second)
>     .toMat(Sink.headOption)(Keep.right).run()
>   queue.offer("1")
>   println(s"Result 1 ${Try(Await.result(f, Duration.Inf))}")
>   queue.offer("2")
>   val f2 = source
>     .filter(_ == "2")
>     .idleTimeout(1.second)
>     .toMat(Sink.headOption)(Keep.right).run()
>   println(s"Result 2 ${Try(Await.result(f2, Duration.Inf))}")
> }

