Hello,

I'm using Source.actorRef[...] for a unknown size but bounded stream (I 
don't know how many elements but I do know it will end). The high level 
outline of the stream is below and sample code can be found in this gist 
<https://gist.github.com/gauthierbl/9cf40135619f4c99c102>.
Client1->ActorRef/Source[Message]->parseFlow[Message, String]->threadSink[
String]->"materialize[Result]"->Client2

My goal is to have the stream be populated via messages to an actor and 
then when the stream is "done" have a way to materialize a result to a 
different part of the application. Input is one client's concern, output is 
another client's concern.

Per the documention 
<http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-integrations.html>
 for 
 Source.actorRef[...]:  

> The stream can be completed successfully by sending akka.actor.PoisonPill
>  or akka.actor.Status.Success to the actor reference.
>
> The stream can be completed with failure by sending 
> akka.actor.Status.Failure to the actor reference.
>

In both cases it pretty clearly states that akka.actor.Status.Success will 
end the stream with success. However when I send in  Status.Success I get a 
ClassCastException 
from the parseFlow. The parsFlow is expecting a Message but its getting a 
Status.Success. 

I also get ClassCastException for Status.Failure. The PoisonPill stops the 
stream immediately and no results are gathered.

I'm not sure why this exception is being thrown. I'm very new to 
akka-streams so I can't tell if is an issue with my usage or I 
am misinterpreting the documentation. I would appreciate any help.

Exception and program output is below, code is here 
<https://gist.github.com/gauthierbl/9cf40135619f4c99c102>.
 Flow Pre Parse: Message(hello)
 Flow Pre Parse: Message(world)
Flow Post Parse: HELLO
 Flow Pre Parse: Message(this)
Flow Post Parse: WORLD
 Flow Pre Parse: Message(is)
Flow Post Parse: THIS
 Flow Pre Parse: Message(a)
Flow Post Parse: IS
           Sink: HELLO
 Flow Pre Parse: Message(test)
           Sink: HELLO , WORLD
Flow Post Parse: A
           Sink: HELLO , WORLD , THIS
Flow Post Parse: TEST
           Sink: HELLO , WORLD , THIS , IS
           Sink: HELLO , WORLD , THIS , IS , A
           Sink: HELLO , WORLD , THIS , IS , A , TEST
[ERROR] [06/29/2015 16:17:49.101] [default-akka.actor.default-dispatcher-9] 
[akka.dispatch.Dispatcher] akka.actor.Status$Success$ cannot be cast to 
com.example.Test2$Message
java.lang.ClassCastException: akka.actor.Status$Success$ cannot be cast to 
com.example.Test2$Message
at com.example.Test2$$anonfun$3.apply(Test2.scala:39)
at akka.stream.impl.fusing.Map.onPush(Ops.scala:23)
at akka.stream.impl.fusing.Map.onPush(Ops.scala:22)
at 
akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
at 
akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
at 
akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
at 
akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
at 
akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
at 
akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
at akka.stream.stage.AbstractStage.enterAndPull(Stage.scala:74)
at 
akka.stream.impl.fusing.ActorOutputBoundary.akka$stream$impl$fusing$ActorOutputBoundary$$tryPutBallIn(ActorInterpreter.scala:217)
at 
akka.stream.impl.fusing.ActorOutputBoundary$$anonfun$downstreamRunning$1.applyOrElse(ActorInterpreter.scala:305)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.stream.impl.SubReceive.applyOrElse(Transfer.scala:12)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
akka.stream.impl.fusing.ActorInterpreter.aroundReceive(ActorInterpreter.scala:366)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks for your help,
--Brandon

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to