Hello!

I'm very inspired of Reactive streams idea and akka implementation. 
But i found some discourage fact in 1.0-M4: Why is the stream supervisor 
works only for one exception and other exceptions ignores?

Example1 with one bad element in the stream:

> val decider: Supervision.Decider = exception => exception match {
>   case _: ArithmeticException => Supervision.Resume
>   case _: NumberFormatException => Supervision.Resume
>   case _ => Supervision.Stop
> }
>
> implicit val system = ActorSystem("system")
>
> implicit val materializer = ActorFlowMaterializer(
>   ActorFlowMaterializerSettings(system)
>     .withSupervisionStrategy(decider)
> )
>
> val source = Source(Vector("1a", "2", "3", "4")).map( _.toInt )
> val result = source.runWith(Sink.foreach(i => println(i)))
>
> in this example, everything was fine and console output was "2,3,4"

Example2 with more than one bad elements in the stream:

> val decider: Supervision.Decider = exception => exception match {
>   case _: ArithmeticException => Supervision.Resume
>   case _: NumberFormatException => Supervision.Resume
>   case _ => Supervision.Stop
> }
>
> implicit val system = ActorSystem("system")
>
> implicit val materializer = ActorFlowMaterializer(
>   ActorFlowMaterializerSettings(system)
>     .withSupervisionStrategy(decider)
> )
>
> val source = Source(Vector("1a", "2a", "3a", "2", "3", "4")).map( _.toInt )
> val result = source.runWith(Sink.foreach(i => println(i)))
>
> in this exampe, stream raised error:

> 14:11:24.922 [system-akka.actor.default-dispatcher-6] ERROR 
>> akka.actor.OneForOneStrategy - For input string: "2a"
>
> java.lang.NumberFormatException: For input string: "2a"
>
> at 
>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>>  
>> ~[na:1.7.0_75]
>
> at java.lang.Integer.parseInt(Integer.java:492) ~[na:1.7.0_75]
>
> at java.lang.Integer.parseInt(Integer.java:527) ~[na:1.7.0_75]
>
> at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:247) 
>> ~[scala-library-2.11.5.jar:na]
>
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:30) 
>> ~[scala-library-2.11.5.jar:na]
>
> at Application$$anonfun$3.apply(Application.scala:23) ~[classes/:na]
>
> at Application$$anonfun$3.apply(Application.scala:23) ~[classes/:na]
>
> at akka.stream.impl.fusing.Map.onPush(Ops.scala:16) 
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:296)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:206)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:294)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:416)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> akka.stream.impl.fusing.OneBoundedInterpreter$$anon$6.push(Interpreter.scala:498)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:137)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at 
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>  
>> ~[scala-library-2.11.5.jar:na]
>
> at akka.stream.impl.SubReceive.apply(Transfer.scala:16) 
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at akka.stream.impl.SubReceive.apply(Transfer.scala:12) 
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
>> ~[scala-library-2.11.5.jar:na]
>
> at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12) 
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
>> ~[scala-library-2.11.5.jar:na]
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
>> ~[akka-actor_2.11-2.3.9.jar:na]
>
> at 
>> akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:282)
>>  
>> ~[akka-stream-experimental_2.11-1.0-M4.jar:na]
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
>> [akka-actor_2.11-2.3.9.jar:na]
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
>> [akka-actor_2.11-2.3.9.jar:na]
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
>> [akka-actor_2.11-2.3.9.jar:na]
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
>> [akka-actor_2.11-2.3.9.jar:na]
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
>> [akka-actor_2.11-2.3.9.jar:na]
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>> [scala-library-2.11.5.jar:na]
>
> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  
>> [scala-library-2.11.5.jar:na]
>
> at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>> [scala-library-2.11.5.jar:na]
>
> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>  
>> [scala-library-2.11.5.jar:na]
>
>  
May be i must use some specific high-level actors or something else? Any 
suggestion, please.




 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to