Hi I've define a flow like this:
def makeAST = Flow[String].log("Parse Json") map { json => Try { val jsonMessage = parse(json) val jsonNode = asJsonNode(jsonMessage) val report = validator.validate(jsonSchema, jsonNode) if (!report.isSuccess) throw new Exception("Validation of Json fail") jsonMessage } } So I need to check if json is valid then if yes I continue the flow else I need to reply a 500 error to make this with akka-flow i'm using this val A = builder.add(makeAST.named("makeAST")) val B = builder.add(filterPartialFlow(j => j.isSuccess)) where filterPartialFlow is def filterPartialFlow(filterFunction: T => Boolean) = FlowGraph.create() { implicit b => val bcast = b.add(Broadcast[T](2)) val filter = b.add(Flow[T] filter (filterFunction(_))) val notFilter = b.add(Flow[T] filter (!filterFunction(_))) bcast ~> filter bcast ~> notFilter UniformFanOutShape(bcast.in, filter.outlet, notFilter.outlet) } When I throw the Exception I arrive to another flow that make something like this def sendHTTPErrorBack = Flow[Try[JValue]] .mapAsync[HttpResponse](10) { v => Future(HttpResponse(status = StatusCodes.BadRequest, entity = HttpEntity ("Invalid Json Error"))) }.withAttributes(supervisionStrategy(resumingDecider)) In the route I define something like this: flows.kafkaFlow(json) .withAttributes(ActorAttributes.supervisionStrategy( decider)) .runWith(Sink.head[HttpResponse]): Future[HttpResponse ] The problem is that I never send back my response only 500 with message There was an internal server error : with exception java.util.NoSuchElementException: empty stream at akka.stream.impl.HeadSink$HeadSinkSubscriber.onComplete(Sinks.scala:119) at akka.stream.impl.ReactiveStreamsCompliance$.tryOnComplete(ReactiveStreamsCompliance.scala:104) at akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary.akka$stream$impl$fusing$ActorGraphInterpreter$ActorOutputBoundary$$complete(ActorGraphInterpreter.scala:220) at akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary$$anon$2.onUpstreamFinish(ActorGraphInterpreter.scala:242) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:575) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:511) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:399) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:371) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:291) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:487) at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1) at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149) at akka.kamon.instrumentation.ActorCellInstrumentation$$anonfun$aroundBehaviourInvoke$1.apply(ActorCellInstrumentation.scala:62) at kamon.trace.Tracer$.withContext(TracerModule.scala:57) at akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorCellInstrumentation.scala:61) at akka.actor.ActorCell.invoke(ActorCell.scala:483) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I dont' know how to change the flow behavior Thanks for help -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.