Hi Konrad,

thanks a lot, your answer does help a lot.

- My bad, the onError from MongoDb publisher returns normally but passes an 
Exception object, their implementation seems to be correct according to the 
reactive stream spec.
- Ok, so the supervision does not work for the source created via 
fromPublisher(), is there a way to catch this propagated failure somewhere 
else in the stream ? Would it be better to wrap the publisher in an actor ?

Antonin PA

Le mardi 21 février 2017 16:37:37 UTC+1, Konrad Malawski a écrit :
> Hi Antonin,
> Two things here:
> One: "throws an error on the onError callback"? This is not allowed by the 
> spec:
> Calling onSubscribe, onNext, onError or onComplete MUST return normally 
> except when any provided parameter is null in which case it MUST throw a 
> java.lang.NullPointerException to the caller, for all other situations 
> the only legal way for a Subscriber to signal failure is by cancelling 
> its Subscription. In the case that this rule is violated, any associated 
> Subscription to the Subscriber MUST be considered as cancelled, and the 
> caller MUST raise this error condition in a fashion that is adequate for 
> the runtime environment.
> https://github.com/reactive-streams/reactive-streams-jvm#2.13
> Is that mongo implementation surely tested and conforming to the reactive 
> streams spec?
> Two: Supervision does not work for arbitrary 3rd party publishers - it 
> only works within Akka Streams (specific stages, specifically handle it), 
> it's an additional feature Akka Streams provide over what Reactive Streams 
> do.
> Hope this helps
> -- 
> Konrad `ktoso` Malawski
> Akka <http://akka.io> @ Lightbend <http://lightbend.com>
> On 22 February 2017 at 00:04:06, antonin perrot-audet (perro...@gmail.com 
> <javascript:>) wrote:
> Hello,
> has anyone succeded at having a source fromPublisher() conform to the 
> SupervisionStrategy defined in the ActorMaterializer ?
> I have that publisher  :
> MongoClients.create(mongoSettings).listDatabaseNames()
> That throws an exception on the onError callback, but the errorDecider 
> never gets called. The stream colapses on a failed Future :
>     implicit val system2 = ActorSystem("Sys2")
>     import system2.dispatcher
>     val errorDecider : Supervision.Decider = {
>       case _ => {
>         println("errorDecider does something, yeiii !")
>         Supervision.stop}
>     }
>     implicit val materializer = ActorMaterializer(
> ActorMaterializerSettings(system2)
>       .withSupervisionStrategy(errorDecider))
>     val publiString : Publisher[String] = MongoClients.create(
> mongoSettings).listDatabaseNames()
>     val stream: Source[String, NotUsed] = Source.fromPublisher(publiString
> )
>     stream.map( {s => println(s);s}).runWith(Sink.ignore).onComplete(_ => 
> system2.terminate())
> thanks in advace for your responses.
> Best,
> Antonin PA
Reply via email to