Hi

I've define a flow like this:

def makeAST = Flow[String].log("Parse Json") map { json =>
    Try {
      val jsonMessage = parse(json)
      val jsonNode = asJsonNode(jsonMessage)
      val report = validator.validate(jsonSchema, jsonNode)
      if (!report.isSuccess) throw new Exception("Validation of Json fail")


      jsonMessage
    }
  }



So I need to check if json is valid then if yes I continue the flow else I 
need to reply a 500 error

to make this with akka-flow i'm using this
    val A = builder.add(makeAST.named("makeAST"))
    val B = builder.add(filterPartialFlow(j => j.isSuccess))


where filterPartialFlow is

def filterPartialFlow(filterFunction: T => Boolean) = FlowGraph.create() { 
implicit b =>
    val bcast = b.add(Broadcast[T](2))
    val filter = b.add(Flow[T] filter (filterFunction(_)))
    val notFilter = b.add(Flow[T] filter (!filterFunction(_)))


    bcast ~> filter
    bcast ~> notFilter


    UniformFanOutShape(bcast.in, filter.outlet, notFilter.outlet)
  }



When I throw the Exception I arrive to another flow that make something 
like this

 def sendHTTPErrorBack = Flow[Try[JValue]]
  .mapAsync[HttpResponse](10) { v =>
    Future(HttpResponse(status = StatusCodes.BadRequest, entity = HttpEntity
("Invalid Json Error")))
  }.withAttributes(supervisionStrategy(resumingDecider))



In the route I define something like this:

flows.kafkaFlow(json)
                       .withAttributes(ActorAttributes.supervisionStrategy(
decider))
                      .runWith(Sink.head[HttpResponse]): Future[HttpResponse
]


The problem is that I never send back my response only 500 with message 
There was an internal server error :
with exception

java.util.NoSuchElementException: empty stream
at akka.stream.impl.HeadSink$HeadSinkSubscriber.onComplete(Sinks.scala:119)
at 
akka.stream.impl.ReactiveStreamsCompliance$.tryOnComplete(ReactiveStreamsCompliance.scala:104)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary.akka$stream$impl$fusing$ActorGraphInterpreter$ActorOutputBoundary$$complete(ActorGraphInterpreter.scala:220)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary$$anon$2.onUpstreamFinish(ActorGraphInterpreter.scala:242)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:575)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:511)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:399)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:371)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:291)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:487)
at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)
at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
at 
akka.kamon.instrumentation.ActorCellInstrumentation$$anonfun$aroundBehaviourInvoke$1.apply(ActorCellInstrumentation.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:57)
at 
akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorCellInstrumentation.scala:61)
at akka.actor.ActorCell.invoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I dont' know how to change the flow behavior

Thanks for help

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to