Re: [akka-user] Re: Akka Remoting fails for large messages
Increasing the acceptable-heartbeat-pause for the various (transport, remote watch, cluster) failure detectors is a workaround, with longer failure detection as drawback. The problem is that when you send this large message no other message will be able to pass between those nodes. That also includes heartbeat messages used by the failure detector. Akka remoting is not designed for sending large messages. Several alternatives have already been suggested, and yet another approach would be to use Akka IO http://doc.akka.io/docs/akka/2.3.4/scala/io.html for sending the the large messages. /Patrik On Wed, Aug 13, 2014 at 4:07 AM, Syed Ahmed sbua2...@gmail.com wrote: Just to try out things as I was seeing handshake exception -- By increasing the transport-failure-detector.acceptable-heartbeat-pause = 120s *I was able to send 400MB message in about 67secs * *---* *[DEBUG] [08/12/2014 19:00:38.884] [LocalNodeApp-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://LocalNodeApp)] Using serializer[akka.remote.serialization.ProtobufSerializer] for message [org.opendaylight.controller.cluster.datastore.messages.Messages$LargeMessage]sending of large message done at 1407895238883[INFO] [08/12/2014 19:01:45.349] [LocalNodeApp-akka.actor.default-dispatcher-4] [akka://LocalNodeApp/user/$a] Message from Server: time taken to process66459---Seems that might not be a valid value to set as we want remote failure detection quicker.. reading previous posts it looks like 20s is a reasonable value to set this .. Can this be done programmatically? Thx* On Friday, August 8, 2014 7:02:41 AM UTC-7, Konrad Malawski wrote: Hello Syed, So the reason we wanted to verify 2.3.4 is that we have implemented a separate way to propagate heartbeats in that version – shouldn't disconnect nodes under load from the cluster as easily. The topic here discussed has been touched upon a few times in the last weeks, refer to: arbitrary and unpredictably large messages to a remote actor https://groups.google.com/forum/#!searchin/akka-user/large$20messages/akka-user/HXmpLAMWmjA/I3MTsGlLMIIJ Large messages s. distributed file system https://groups.google.com/forum/#!searchin/akka-user/large$20messages/akka-user/gO3J29ebJXU/kO2e7iXaYKkJ - my fav. suggestion at this point. You can also enable debug logging and remote lifecycle events https://groups.google.com/forum/#!searchin/akka-user/large$20messages/akka-user/I5xcmDF5X00/v2Q7hLcPCuQJ, so we have more clues on what's actually happening. On Thu, Aug 7, 2014 at 11:30 PM, Syed Ahmed sbua...@gmail.com wrote: FYI - Just tried with 2.3.4 and it doesn't change the behavior.. On Thursday, August 7, 2014 10:38:38 AM UTC-7, Syed Ahmed wrote: Hi Ryan, In my test its a very large string and I just get the string back from the message (i.e. it gets deserialized) -- Im not doing anything further. This test is to just check on how large message can be send accross. I will attempt again with 2.3.4 to see if it makes a difference thx -syed On Wednesday, August 6, 2014 3:12:27 PM UTC-7, Ryan Tanner wrote: When those large messages are received, what's supposed to happen? It's possible that whatever processing is triggered by that message is so CPU intensive that Akka's internal remoting can't get any time for its own messages. On Wednesday, August 6, 2014 12:08:20 PM UTC-6, Syed Ahmed wrote: Hello, Im new to Akka/Akka Remoting.. We need to send large messages in the order 500-600MB. After viewing and checking some mailer post I found that I have to change Akka Remote netty tcp settings which I did and my application.conf has the following... remote { transport = akka.remote.netty.NettyRemoteTransport netty.tcp { hostname = ip address of host port = 2553 maximum-frame-size = 524288000 send-buffer-size= 5 receive-buffer-size=5 } } I create a client and remote server app using akka-remoting. Using protobuff message serialization -- a client sends a large message to server and the server acknowledges with the timestamp when it received after deserialized the same. With above settings things work fine for messages upto 200MB. When I try slightly large message 250MB.. The server shows the following .. --- [INFO] [08/05/2014 23:41:39.302] [RemoteNodeApp-akka.remote.default-remote-dispatcher-4] [akka.tcp://RemoteNodeApp@remote-ip:2552/system/transports/ akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FRemoteNod eApp%40remote_ip%3A36922-1] No response from remote. Handshake timed out or transport failure detector triggered. --- and the client shows the following [WARN] [08/05/2014 23:41:39.326] [LocalNodeApp-akka.remote.default-remote-dispatcher-6] [akka.tcp://LocalNodeApp@client_ip:2553/system/endpointManager/
Re: [akka-user] [akka-stream] Delayed retry
On Wed, Aug 13, 2014 at 9:01 AM, Evgeniy Ostapenko sml...@gmail.com wrote: :-) But the implementation of Producer could be from N different libraries. Concretization given type of Producer can be useful:) What do you mean? I'm still not sure what and who needs what properties and why it is useful, do you have a concrete example? I was started simple project which use stream settings. Now it is only proof of concept. But I think this can be concrete example. And I am sorry for project name - I will be rename it anyway. I not sure, whether I need private[akka] classes in that project or not - so a package name is akka now. So if I understand your example, it is useful for things your particular library that adds features that were not intended for akka-streams? I'm not sure that's a good enough selling point for the feature request. https://github.com/smlin/akka-rx I am sorry for PR :) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] reading and writing moderately sized xml (and other) files
On Tue, Aug 12, 2014 at 8:27 AM, Tim Pigden tim.pig...@optrak.com wrote: My application occasionally needs to read and write files. Specifically I've got a file store of data project files - each one of which is maybe 10mb of xml with possibly an accompanying 2-3 mb of csv. The xml is currently being read with scala XML.loadFile(fileName) and the csv is being loaded into an input stream for processing using a java library for parsing and then various tools to process the data. Likewise the write (xml only) is just doing a straight write. It doesn't happen *a lot* - maybe 40-50 loads per day - as once loaded the files are loaded the data is managed through akka persistence - so the xml is the initial state (or starting snapshot). In test, it's working fine. It's not going to scale to 100s or 1000s of loads per day in the foreseeable future and on my home computer at least, the loading is taking sub-second times. 1. should I even bother to convert this to some non-blocking protocol? You should use a dedicated dispatcher for these blocking actors, but if you have a limited number and not too high load blocking IO might be just fine. 2. If so, is there some pre-existing code to do simple files? I see stuff for for TCP/IP but nothing for files. Inspiration can be found here: https://github.com/drexin/akka-io-file /Patrik Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream] Delayed retry
On Wed, Aug 13, 2014 at 11:06 AM, Evgeniy Ostapenko sml...@gmail.com wrote: среда, 13 августа 2014 г., 12:05:48 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 9:01 AM, Evgeniy Ostapenko sml...@gmail.com wrote: :-) But the implementation of Producer could be from N different libraries. Concretization given type of Producer can be useful:) What do you mean? ``` val producer: Producer[_] = getUnknownProducerFromOtherLibrary.. val concreteProducer: Producer[_] = ProducerWithKnownSettings(producer, settings) or implicit class RichProducer[T](producer: Producer[T]) { def withSettings(settings: StreamSettings): Producer[T] = ProducerWithKnownSettings(producer, settings) } val concreteProducer = producer withSettings StreamSettings(exceptionHandler = log.error(..., _)) ``` But you can do that already, it's perfectly possible to attach a Processor to a Producer from some other library and expose it as a Producer with more info to others. I'm still not sure what and who needs what properties and why it is useful, do you have a concrete example? I was started simple project which use stream settings. Now it is only proof of concept. But I think this can be concrete example. And I am sorry for project name - I will be rename it anyway. I not sure, whether I need private[akka] classes in that project or not - so a package name is akka now. So if I understand your example, it is useful for things your particular library that adds features that were not intended for akka-streams? I'm not sure that's a good enough selling point for the feature request. Yes, a lot of features from my library not intended for akka-streams. But I think saving consistency inside zip and sometimes after merge transformations is important for akka-streams also. In what way is not zip consistent? (merge is inconsistent by design, and concat is consistent by design) Otherwise from user side you must always remember about consistency and work with that transformations very carefully. Also a few settings can be effectively fixed in low level code - for example instead wrapping each transformations in Try block on top of akka-streams it possible to wrap only next event generator. Hmmm, do you have an example of that (code)? And thank you for your answers again:) You're most welcome, thanks for asking questions :) https://github.com/smlin/akka-rx I am sorry for PR :) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Handling Exception in Akka
What is the question? If you are looking for a way to reply to the sender in case of failure, instead of timeout, you need to catch the exception in the EmailServiceWorker and reply from the actor. An alternative would be to reply from the supervisionStrategy in the parent actor, but that requires that you keep track of the original sender there. Regards, Patrik On Tue, Aug 12, 2014 at 7:58 AM, soumya sengupta maverick.sou...@gmail.com wrote: I have a master worker public class EmailServiceActor extends UntypedActor{ ActorRef actorRef ; private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1 minute), new FunctionThrowable, Directive() { @Override public Directive apply(Throwable t) { if (t instanceof IOException) { System.out.println(IO Exception occurred); return restart(); } else if (t instanceof Exception) { return stop(); } else { return escalate(); } } }); @Override public void onReceive(Object message) { System.out.println(Received .); if(message instanceof MyLocalMessage){ System.out.println(received instr from EmailServiceWorker by EmailServiceActor); actorRef.tell(message, self()); }else{ actorRef = getSender(); System.out.println(received instr by EmailServiceActor from Application); getContext().actorOf(Props.create(EmailServiceWorker.class),EmailServiceWorker).tell(message, self()); } } @Override public SupervisorStrategy supervisorStrategy() { return strategy; } @Override public void preStart() { System.out.println(Pre Restart...); } } I also have a child worker public class EmailServiceWorker extends UntypedActor{ @Override public void onReceive(Object message) throws IOException{ System.out.println(received instr by EmailServiceWorker); System.out.println(Sending mail); FileReader reader = new FileReader(someFile); MyLocalMessage myLocalMessage = new MyLocalMessage(Hello); getSender().tell( myLocalMessage, getSelf() ); getContext().stop(getSelf()); } @Override public void preStart() { System.out.println(Pre Restart); } } There is no such file as someFile. I am deliberately trying to raise the error. But when I run the application I get the error [info] play - Application started (Dev) Pre Restart... Received . received instr by EmailServiceActor from Application Pre Restart received instr by EmailServiceWorker Sending mail IO Exception occurred Pre Restart [ERROR] [08/12/2014 10:53:13.844] [play-akka.actor.default-dispatcher-5] [akka://play/user/EmailServiceActor/EmailServiceWorker] someFile (No such file or directory) java.io.FileNotFoundException: someFile (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at java.io.FileReader.init(FileReader.java:58) at actors.EmailServiceWorker.onReceive(EmailServiceWorker.java:18) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [error] play - Cannot invoke the action, eventually got an error: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://play/), Path(/user/EmailServiceActor)]] after [19 ms] [error] application - ! @6j77j81fk - Internal server error, for (GET) [/sendmail] - -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. --
Re: [akka-user] Lost bytes with the akka stream TcpEcho example
Hi William, pasting your code so I can comment. // the client connects to the server gets the 6Mb back .. or almost 6Mb :/ ! def client(system: ActorSystem, serverAddress: InetSocketAddress): Unit = { implicit val sys = system implicit val ec = system.dispatcher val settings = MaterializerSettings() val materializer = FlowMaterializer(settings) implicit val timeout = Timeout(5.seconds) val clientFuture = (IO(StreamTcp) ? StreamTcp.Connect(settings, serverAddress)) clientFuture.onSuccess { case clientBinding: StreamTcp.OutgoingTcpConnection = var count = 0 val decodingBuffer = new ArrayBuffer[Byte]() Flow(clientBinding.inputStream) .map(bytes = { count += bytes.size // THIS IS NOT SAFE: you are mutating a closed-over non-volatile var and Akka Streams are concurrent // if you comment the next 3 lines, you'll get the 6Mb. If not, some data is lost val array = new Array[Byte](bytes.size) decodingBuffer ++= array // THIS IS NOT SAFE: you are mutating a closed-over non-threadsafe data structure from within a concurrent application val bytebuffer = ByteBuffer.wrap(decodingBuffer.toArray) // What is this line intending to achieve }) .onComplete(materializer) { case Success(_) = { println(fwe received $count bytes) // THIS IS NOT SAFE: you are reading a closed-over non-volatile var and Akka Streams are concurrent } case Failure(e) = } } clientFuture.onFailure { case e: Throwable = println(sClient could not connect to $serverAddress: ${e.getMessage}) system.shutdown() } } } Does this code work better? (Warning: I didn't compile it but you'll get the idea) def client(system: ActorSystem, serverAddress: InetSocketAddress): Unit = { implicit val sys = system import sys.dispatcher val materializer = FlowMaterializer(MaterializerSettings()) implicit val timeout = Timeout(5.seconds) (IO(StreamTcp) ? StreamTcp.Connect(settings, serverAddress)) onComplete { case Success(clientBinding: StreamTcp.OutgoingTcpConnection) = Flow(clientBinding.inputStream) .fold(0l)( (count, bytes) = count + bytes.size ) .toFuture(materializer) foreach { count = println(fwe received $count bytes) } } case Failure(e) = println(sClient could not connect to $serverAddress: ${e.getMessage}) system.shutdown() } } On Wed, Aug 13, 2014 at 2:18 AM, William Le Ferrand warne...@gmail.com wrote: Dear List, I've facing a puzzling issue with (a slightly modified version of) the example code TcpEcho.scala. I'm sending 6mb with the server to the client: - when I only count the bytes received, I get the right byte count back (6M) - when I do something as simple as allocating an array inside the operation applied to each bytestring, I start loosing bytes I reduced the issue to a minimal code example: https://gist.github.com/williamleferrand/77b4e40787b593d1f00d . If you run it, you'll see that you don't get exactly 6M bytes back. Does anyone would have a clue of why I'm loosing these bytes? Thanks in advance, Best, William -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Passing ActorRef or better solution?
You can send messages directly to the soldier actors. The ActorRefs will be valid until the actors are stopped. I don't know about your domain/design to give any precise advice if that is better than sending messages via the parent. An advantage of sending everything via the parent is that it could manage the lifecycle of the children based on incoming messages, e.g. create them when needed, and stop them if they are not used any more. /Patrik On Tue, Aug 12, 2014 at 11:19 AM, 09goral gora...@gmail.com wrote: I am developing a War simulation, where two armies are fighting each other. In my app each Actor 'contains' a soldier field. Each soldier has something like visibility range (matrix NxN). I have been reading about ordering in distributed systems, vector clocks etc and I can't warp my head around it and find neat solution yet. But my question is different. Is it ok for let's say Soldier1 form army A to receive from *WorldActor *ActorRefs to enemies that are in his visibility range and make him *talk/attack *directly to them or is it better to send my decision message to *WorldActor *and let him make all the calls? https://lh6.googleusercontent.com/-kzGeYtJJJH0/U-nbnnL-WII/DG0/La7Zx1_aYS0/s1600/diagram1.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Lost bytes with the akka stream TcpEcho example
There is a bug in Akka Tcp which causes this. I dont remember the ticket No. but it is fixed and will be part of the next Akka bugfix release - then streams will be fixed at the next release. 2014.08.13. 7:37 ezt írta (William Le Ferrand warne...@gmail.com): Dear List, I've facing a puzzling issue with (a slightly modified version of) the example code TcpEcho.scala. I'm sending 6mb with the server to the client: - when I only count the bytes received, I get the right byte count back (6M) - when I do something as simple as allocating an array inside the operation applied to each bytestring, I start loosing bytes I reduced the issue to a minimal code example: https://gist.github.com/williamleferrand/77b4e40787b593d1f00d . If you run it, you'll see that you don't get exactly 6M bytes back. Does anyone would have a clue of why I'm loosing these bytes? Thanks in advance, Best, William -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] relaying messages to specific actors
On Tue, Aug 12, 2014 at 12:08 PM, Björn Raupach raupach.bjo...@googlemail.com wrote: Dear group, at the moment I am looking into akka for back-end batch processing. I know akka is a toolkit, but I am a bit confused by all the choices akka is offering me. The general idea is simple. One actor (producer) polls a message queue on a regular basis and then relays the messages to specific actors (consumers). The messages itself are just plain JSON with a type property that denotes the specific actor that can handle the job. The relay is my problem. What is a good approach? * Should the producer look into message switch over type and tell the correct consumer? I would start with this approach. /Patrik * Should I use a BroadcastGroup and let the consumer decide if he wants to handle the message? * Should I use the event bus? Some more details: our consumers are heavy on IO and expected to block and might take a while to finish. The consumer itself fall into several logicals groups, so I really like to have supervision trees. If one crashes, just restart it. Not important if the job finishes or not. We are also a Java shop, so unfortunately no Scala (at the moment). I guess there is no best solution, but some pointers from more experienced folks would be highly appreciated. Thanks in advance! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Lost bytes with the akka stream TcpEcho example
That is probably https://github.com/akka/akka/issues/15572 /Patrik On Wed, Aug 13, 2014 at 11:50 AM, Endre Varga endre.va...@typesafe.com wrote: There is a bug in Akka Tcp which causes this. I dont remember the ticket No. but it is fixed and will be part of the next Akka bugfix release - then streams will be fixed at the next release. 2014.08.13. 7:37 ezt írta (William Le Ferrand warne...@gmail.com): Dear List, I've facing a puzzling issue with (a slightly modified version of) the example code TcpEcho.scala. I'm sending 6mb with the server to the client: - when I only count the bytes received, I get the right byte count back (6M) - when I do something as simple as allocating an array inside the operation applied to each bytestring, I start loosing bytes I reduced the issue to a minimal code example: https://gist.github.com/williamleferrand/77b4e40787b593d1f00d . If you run it, you'll see that you don't get exactly 6M bytes back. Does anyone would have a clue of why I'm loosing these bytes? Thanks in advance, Best, William -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream] Delayed retry
среда, 13 августа 2014 г., 13:24:34 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 11:06 AM, Evgeniy Ostapenko sml...@gmail.com javascript: wrote: среда, 13 августа 2014 г., 12:05:48 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 9:01 AM, Evgeniy Ostapenko sml...@gmail.com wrote: :-) But the implementation of Producer could be from N different libraries. Concretization given type of Producer can be useful:) What do you mean? ``` val producer: Producer[_] = getUnknownProducerFromOtherLibrary.. val concreteProducer: Producer[_] = ProducerWithKnownSettings(producer, settings) or implicit class RichProducer[T](producer: Producer[T]) { def withSettings(settings: StreamSettings): Producer[T] = ProducerWithKnownSettings(producer, settings) } val concreteProducer = producer withSettings StreamSettings(exceptionHandler = log.error(..., _)) ``` But you can do that already, it's perfectly possible to attach a Processor to a Producer from some other library and expose it as a Producer with more info to others. Sounds good, can you show simple example? I sense a need to view your example to exactly understand what do you mean. I'm still not sure what and who needs what properties and why it is useful, do you have a concrete example? I was started simple project which use stream settings. Now it is only proof of concept. But I think this can be concrete example. And I am sorry for project name - I will be rename it anyway. I not sure, whether I need private[akka] classes in that project or not - so a package name is akka now. So if I understand your example, it is useful for things your particular library that adds features that were not intended for akka-streams? I'm not sure that's a good enough selling point for the feature request. Yes, a lot of features from my library not intended for akka-streams. But I think saving consistency inside zip and sometimes after merge transformations is important for akka-streams also. In what way is not zip consistent? (merge is inconsistent by design, and concat is consistent by design) val p = Flow(0 to 9).toProducer(materializer) val p2 = Flow(p).filter(_%2 == 0).toProducer(materializer) Flow(p).zip(p2).foreach(println).consume(materializer) or more complicated example: val consistent = Flow(0 to 9).toProducer(materializer) val p2 = Flow(p).filter(_%2 == 0).toProducer(materializer) val merged = Flow(consistent).filter(_%2 != 0).merge(p2) // inconsistent by design - ok it is really not problem Flow(consistent).zip(merged).foreach(println).consume(materializer) // inconsistent because any (not only zip or merge) operation with inconsistent stream will produce only inconsistent stream Otherwise from user side you must always remember about consistency and work with that transformations very carefully. Also a few settings can be effectively fixed in low level code - for example instead wrapping each transformations in Try block on top of akka-streams it possible to wrap only next event generator. Hmmm, do you have an example of that (code)? For example inside TransformProcessorImpl: emits = transformer.onNext(primaryInputs.dequeueInputElement()) can be changed to emits = _settings.exceptionStrategyExecutor(transformer.onNext(primaryInputs.dequeueInputElement())) and inside ActorProducerImpl: pushToDownstream(withCtx(context)(f())) to pushToDownstream(withCtx(context)(settings. exceptionStrategyExecutor(f( or maybe it can be called simply settings.executor or settings.runner. Also it can be parameter like: settings.throwExceptions: Boolean (if (settings.throwExceptions) f() else Try(f()).recover { log but not push to downstream } . Eventually all of this can be inside MaterializeSettings. However I think MaterializeSettings is the info about as create stream with akka. In difference StreamSettings is info only about the natural stream properties not linked to any library. And thank you for your answers again:) You're most welcome, thanks for asking questions :) https://github.com/smlin/akka-rx I am sorry for PR :) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Cheers, √ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives:
Re: [akka-user] [akka-stream] Delayed retry
On Wed, Aug 13, 2014 at 12:55 PM, Evgeniy Ostapenko sml...@gmail.com wrote: среда, 13 августа 2014 г., 13:24:34 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 11:06 AM, Evgeniy Ostapenko sml...@gmail.com wrote: среда, 13 августа 2014 г., 12:05:48 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 9:01 AM, Evgeniy Ostapenko sml...@gmail.com wrote: :-) But the implementation of Producer could be from N different libraries. Concretization given type of Producer can be useful:) What do you mean? ``` val producer: Producer[_] = getUnknownProducerFromOtherLibrary.. val concreteProducer: Producer[_] = ProducerWithKnownSettings(producer, settings) or implicit class RichProducer[T](producer: Producer[T]) { def withSettings(settings: StreamSettings): Producer[T] = ProducerWithKnownSettings(producer, settings) } val concreteProducer = producer withSettings StreamSettings(exceptionHandler = log.error(..., _)) ``` But you can do that already, it's perfectly possible to attach a Processor to a Producer from some other library and expose it as a Producer with more info to others. Sounds good, can you show simple example? I sense a need to view your example to exactly understand what do you mean. Either stay within the Akka impl: Flow(p).transform(doAnythingYouWant).toProducer(materializer) Or: Implement your own Processor that inherits whichever interfaces/classes you need/want to expose. I'm still not sure what and who needs what properties and why it is useful, do you have a concrete example? I was started simple project which use stream settings. Now it is only proof of concept. But I think this can be concrete example. And I am sorry for project name - I will be rename it anyway. I not sure, whether I need private[akka] classes in that project or not - so a package name is akka now. So if I understand your example, it is useful for things your particular library that adds features that were not intended for akka-streams? I'm not sure that's a good enough selling point for the feature request. Yes, a lot of features from my library not intended for akka-streams. But I think saving consistency inside zip and sometimes after merge transformations is important for akka-streams also. In what way is not zip consistent? (merge is inconsistent by design, and concat is consistent by design) val p = Flow(0 to 9).toProducer(materializer) val p2 = Flow(p).filter(_%2 == 0).toProducer(materializer) Flow(p).zip(p2).foreach(println).consume(materializer) Welcome to Scala version 2.11.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0). Type in expressions to have them evaluated. Type :help for more information. scala (0 to 9).zip(1 to 9 by 2).foreach(println) (0,1) (1,3) (2,5) (3,7) (4,9) or more complicated example: val consistent = Flow(0 to 9).toProducer(materializer) val p2 = Flow(p).filter(_%2 == 0).toProducer(materializer) val merged = Flow(consistent).filter(_%2 != 0).merge(p2) // inconsistent by design - ok it is really not problem Flow(consistent).zip(merged).foreach(println).consume(materializer) // inconsistent because any (not only zip or merge) operation with inconsistent stream will produce only inconsistent stream Could you please define consistent here? I think our definitions diverge. Otherwise from user side you must always remember about consistency and work with that transformations very carefully. Also a few settings can be effectively fixed in low level code - for example instead wrapping each transformations in Try block on top of akka-streams it possible to wrap only next event generator. Hmmm, do you have an example of that (code)? For example inside TransformProcessorImpl: emits = transformer.onNext(primaryInputs.dequeueInputElement()) can be changed to emits = _settings.exceptionStrategyExecutor(transformer.onNext(primaryInputs.dequeueInputElement())) and inside ActorProducerImpl: pushToDownstream(withCtx(context)(f())) to pushToDownstream(withCtx(context)(settings. exceptionStrategyExecutor(f( or maybe it can be called simply settings.executor or settings.runner. Also it can be parameter like: settings.throwExceptions: Boolean (if (settings.throwExceptions) f() else Try(f()).recover { log but not push to downstream } . I don't think such properties should be transparent in the API (in settings) since they change the operational semantics. Eventually all of this can be inside MaterializeSettings. However I think MaterializeSettings is the info about as create stream with akka. In difference StreamSettings is info only about the natural stream properties not linked to any library. You are of course free to implement your own version of Reactive Streams and provide your own combinators and settings properties which would, if following the spec, be completely interoperable with other implementations. And thank you for your answers
[akka-user] Re: Handling Exception in Akka
I have a master worker which has a superviser to supervise over its child worker. The child worker throws a IOException which is viewed by the parent who then calls the resume()/restart() method of SupervisorStartegy. Till now when the IO exception is getting thrown, the exception is transferred to the parent, the parent is calling the resume/restart method but after that I am getting a timeout exception. I want that the parent will try to resume/restart the child worker for n number of times before stopping gracefully with a message. But what i am getting in this code is an exception after a single try On Tuesday, August 12, 2014 11:28:12 AM UTC+5:30, soumya sengupta wrote: I have a master worker public class EmailServiceActor extends UntypedActor{ ActorRef actorRef ; private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1 minute), new FunctionThrowable, Directive() { @Override public Directive apply(Throwable t) { if (t instanceof IOException) { System.out.println(IO Exception occurred); return restart(); } else if (t instanceof Exception) { return stop(); } else { return escalate(); } } }); @Override public void onReceive(Object message) { System.out.println(Received .); if(message instanceof MyLocalMessage){ System.out.println(received instr from EmailServiceWorker by EmailServiceActor); actorRef.tell(message, self()); }else{ actorRef = getSender(); System.out.println(received instr by EmailServiceActor from Application); getContext().actorOf(Props.create(EmailServiceWorker.class),EmailServiceWorker).tell(message, self()); } } @Override public SupervisorStrategy supervisorStrategy() { return strategy; } @Override public void preStart() { System.out.println(Pre Restart...); } } I also have a child worker public class EmailServiceWorker extends UntypedActor{ @Override public void onReceive(Object message) throws IOException{ System.out.println(received instr by EmailServiceWorker); System.out.println(Sending mail); FileReader reader = new FileReader(someFile); MyLocalMessage myLocalMessage = new MyLocalMessage(Hello); getSender().tell( myLocalMessage, getSelf() ); getContext().stop(getSelf()); } @Override public void preStart() { System.out.println(Pre Restart); } } There is no such file as someFile. I am deliberately trying to raise the error. But when I run the application I get the error [info] play - Application started (Dev) Pre Restart... Received . received instr by EmailServiceActor from Application Pre Restart received instr by EmailServiceWorker Sending mail IO Exception occurred Pre Restart [ERROR] [08/12/2014 10:53:13.844] [play-akka.actor.default-dispatcher-5] [akka://play/user/EmailServiceActor/EmailServiceWorker] someFile (No such file or directory) java.io.FileNotFoundException: someFile (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at java.io.FileReader.init(FileReader.java:58) at actors.EmailServiceWorker.onReceive(EmailServiceWorker.java:18) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [error] play - Cannot invoke the action, eventually got an error: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://play/), Path(/user/EmailServiceActor)]] after [19 ms] [error] application - ! @6j77j81fk - Internal server error, for (GET) [/sendmail] - -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User
[akka-user] Re: Handling Exception in Akka
Why should it give a timeout before even trying 3 times to restart/resume the worker as my strategy says *OneForOneStrategy(3, Duration.create(1 minute)*, Am I missing something ? On Tuesday, August 12, 2014 11:28:12 AM UTC+5:30, soumya sengupta wrote: I have a master worker public class EmailServiceActor extends UntypedActor{ ActorRef actorRef ; private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create(1 minute), new FunctionThrowable, Directive() { @Override public Directive apply(Throwable t) { if (t instanceof IOException) { System.out.println(IO Exception occurred); return restart(); } else if (t instanceof Exception) { return stop(); } else { return escalate(); } } }); @Override public void onReceive(Object message) { System.out.println(Received .); if(message instanceof MyLocalMessage){ System.out.println(received instr from EmailServiceWorker by EmailServiceActor); actorRef.tell(message, self()); }else{ actorRef = getSender(); System.out.println(received instr by EmailServiceActor from Application); getContext().actorOf(Props.create(EmailServiceWorker.class),EmailServiceWorker).tell(message, self()); } } @Override public SupervisorStrategy supervisorStrategy() { return strategy; } @Override public void preStart() { System.out.println(Pre Restart...); } } I also have a child worker public class EmailServiceWorker extends UntypedActor{ @Override public void onReceive(Object message) throws IOException{ System.out.println(received instr by EmailServiceWorker); System.out.println(Sending mail); FileReader reader = new FileReader(someFile); MyLocalMessage myLocalMessage = new MyLocalMessage(Hello); getSender().tell( myLocalMessage, getSelf() ); getContext().stop(getSelf()); } @Override public void preStart() { System.out.println(Pre Restart); } } There is no such file as someFile. I am deliberately trying to raise the error. But when I run the application I get the error [info] play - Application started (Dev) Pre Restart... Received . received instr by EmailServiceActor from Application Pre Restart received instr by EmailServiceWorker Sending mail IO Exception occurred Pre Restart [ERROR] [08/12/2014 10:53:13.844] [play-akka.actor.default-dispatcher-5] [akka://play/user/EmailServiceActor/EmailServiceWorker] someFile (No such file or directory) java.io.FileNotFoundException: someFile (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at java.io.FileReader.init(FileReader.java:58) at actors.EmailServiceWorker.onReceive(EmailServiceWorker.java:18) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [error] play - Cannot invoke the action, eventually got an error: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://play/), Path(/user/EmailServiceActor)]] after [19 ms] [error] application - ! @6j77j81fk - Internal server error, for (GET) [/sendmail] - -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream] Delayed retry
On Wed, Aug 13, 2014 at 3:11 PM, Evgeniy Ostapenko sml...@gmail.com wrote: среда, 13 августа 2014 г., 15:51:09 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 12:55 PM, Evgeniy Ostapenko sml...@gmail.com wrote: среда, 13 августа 2014 г., 13:24:34 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 11:06 AM, Evgeniy Ostapenko sml...@gmail.com wrote: среда, 13 августа 2014 г., 12:05:48 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 9:01 AM, Evgeniy Ostapenko sml...@gmail.com wrote: :-) But the implementation of Producer could be from N different libraries. Concretization given type of Producer can be useful:) What do you mean? ``` val producer: Producer[_] = getUnknownProducerFromOtherLibrary.. val concreteProducer: Producer[_] = ProducerWithKnownSettings(producer, settings) or implicit class RichProducer[T](producer: Producer[T]) { def withSettings(settings: StreamSettings): Producer[T] = ProducerWithKnownSettings(producer, settings) } val concreteProducer = producer withSettings StreamSettings( exceptionHandler = log.error(..., _)) ``` But you can do that already, it's perfectly possible to attach a Processor to a Producer from some other library and expose it as a Producer with more info to others. Sounds good, can you show simple example? I sense a need to view your example to exactly understand what do you mean. Either stay within the Akka impl: Flow(p).transform( doAnythingYouWant).toProducer(materializer) Or: Implement your own Processor that inherits whichever interfaces/classes you need/want to expose. You proposed to user copy a private akka *impl classes that to change a few lines of code?:) Of course I can do this. You are free to do what you want :) I'm still not sure what and who needs what properties and why it is useful, do you have a concrete example? I was started simple project which use stream settings. Now it is only proof of concept. But I think this can be concrete example. And I am sorry for project name - I will be rename it anyway. I not sure, whether I need private[akka] classes in that project or not - so a package name is akka now. So if I understand your example, it is useful for things your particular library that adds features that were not intended for akka-streams? I'm not sure that's a good enough selling point for the feature request. Yes, a lot of features from my library not intended for akka-streams. But I think saving consistency inside zip and sometimes after merge transformations is important for akka-streams also. In what way is not zip consistent? (merge is inconsistent by design, and concat is consistent by design) val p = Flow(0 to 9).toProducer(materializer) val p2 = Flow(p).filter(_%2 == 0).toProducer(materializer) Flow(p).zip(p2).foreach(println).consume(materializer) Welcome to Scala version 2.11.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0). Type in expressions to have them evaluated. Type :help for more information. scala (0 to 9).zip(1 to 9 by 2).foreach(println) (0,1) (1,3) (2,5) (3,7) (4,9) or more complicated example: val consistent = Flow(0 to 9).toProducer(materializer) val p2 = Flow(p).filter(_%2 == 0).toProducer(materializer) val merged = Flow(consistent).filter(_%2 != 0).merge(p2) // inconsistent by design - ok it is really not problem Flow(consistent).zip(merged).foreach(println).consume(materializer) // inconsistent because any (not only zip or merge) operation with inconsistent stream will produce only inconsistent stream Could you please define consistent here? I think our definitions diverge. In case of zip, consistent means that a zip must produce pairs of events, taken from transformations of one event in past. source0, 1, 2, 3, 4, 5, 6, 7, 8, 9 |\ |\ | filter |_%2 == 0 // 0, 2, 4, 5, 6, 7, 8 | | |/ |/ (0, 0), (2, 2), (4, 4), (6, 6), (8, 8) // consistent zip (0, 0), (1, 2), (2, 4), (3, 6), (4, 8) // inconsistent I'm sorry but I think the standard library disagrees with you: scala val i = 0 to 9 i: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) scala val i2 = i filter (_ % 2 == 0) i2: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 2, 4, 6, 8) scala i zip i2 foreach println (0,0) (1,2) (2,4) (3,6) (4,8) Otherwise from user side you must always remember about consistency and work with that transformations very carefully. Also a few settings can be effectively fixed in low level code - for example instead wrapping each transformations in Try block on top of akka-streams it possible to wrap only next event generator. Hmmm, do you
[akka-user] akka-persistence cassandra journal plugin api
Hi, I am using the journal plugin for akka persistnce as listed here https://github.com/krasserm/akka-persistence-cassandra/, however I see its mentioned that it works only with cassandra 2.0.3 and higer and we have cassandra in our project configured but of version 1.2.13.2, does that mean we would be unable to use this? Post configuring the journal cassandra based plugin when I start our akka application I see below ( cassandra host that is configured has version less 2.0.3). Kindly help akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:596) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.4.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) ~[cassandra-driver-core-2.0.3.jar:na] at akka.persistence.cassandra.journal.CassandraJournal.init(CassandraJournal.scala:52) ~[akka-persistence-cassandra_2.10-0.3.3.jar:0.3.3] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_51] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_51] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_51] at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_51] at java.lang.Class.newInstance(Class.java:374) ~[na:1.7.0_51] at akka.util.Reflect$.instantiate(Reflect.scala:45) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.NoArgsReflectConstructor.produce(Props.scala:361) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.Props.newActor(Props.scala:252) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.newActor(ActorCell.scala:552) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:578) ~[akka-actor_2.10-2.3.4.jar:na] ... 9 common frames omitted com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.Responses$Error.asException(Responses.java:95) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:582) ~[cassandra-driver-core-2.0.3.jar:na] at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na] at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na] at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na] at
Re: [akka-user] akka-persistence cassandra journal plugin api
Hello there, I’m no cassandra guru but if something is implemented for 2.x I’d expect it not to work with 1.x. Can you not use a newer version of cassandra? — konrad On 13 August 2014 at 17:42:04, ratika.pra...@razorthink.net (ratika.pra...@razorthink.net) wrote: Hi, I am using the journal plugin for akka persistnce as listed here https://github.com/krasserm/akka-persistence-cassandra/, however I see its mentioned that it works only with cassandra 2.0.3 and higer and we have cassandra in our project configured but of version 1.2.13.2, does that mean we would be unable to use this? Post configuring the journal cassandra based plugin when I start our akka application I see below ( cassandra host that is configured has version less 2.0.3). Kindly help akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:596) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.4.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) ~[cassandra-driver-core-2.0.3.jar:na] at akka.persistence.cassandra.journal.CassandraJournal.init(CassandraJournal.scala:52) ~[akka-persistence-cassandra_2.10-0.3.3.jar:0.3.3] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_51] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_51] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_51] at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_51] at java.lang.Class.newInstance(Class.java:374) ~[na:1.7.0_51] at akka.util.Reflect$.instantiate(Reflect.scala:45) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.NoArgsReflectConstructor.produce(Props.scala:361) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.Props.newActor(Props.scala:252) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.newActor(ActorCell.scala:552) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:578) ~[akka-actor_2.10-2.3.4.jar:na] ... 9 common frames omitted com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.Responses$Error.asException(Responses.java:95) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:582) ~[cassandra-driver-core-2.0.3.jar:na] at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na] at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
Re: [akka-user] [akka-stream] Delayed retry
On Wed, Aug 13, 2014 at 4:32 PM, Evgeniy Ostapenko sml...@gmail.com wrote: I'm sorry but I think the standard library disagrees with you: scala val i = 0 to 9 i: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) scala val i2 = i filter (_ % 2 == 0) i2: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 2, 4, 6, 8) scala i zip i2 foreach println (0,0) (1,2) (2,4) (3,6) (4,8) Ok. But what do you think will be printed in my last more complicated example? Can you predict that (No)? Can you predict what will be printed in standard library (Yes)? Since you chose to use a non-deterministic combinator of course the result is going to be non-deterministic. You got exactly what you ordered: If you choose to call System.exit(1), the system will exit. Why you reference to standard library? Akka-streams includes not only sequence abstraction, but also time abstraction. It is impossible and wrong to compare so different apis. Why so? You made a claim regarding intuitiveness, and I made an example where the same intuition as for the standard library holds, what's wrong with that? For example what mean merge in standard library. There is none. So there is nothing to expect. What about concat for two infinite streams in Akka-streams? What does the following do: scala Iterator from 1 res3: Iterator[Int] = non-empty iterator scala Iterator from 1 res4: Iterator[Int] = non-empty iterator scala res3 ++ res4 res5: Iterator[Int] = non-empty iterator res5.whatever fold or lastOption in case infinite stream? For an infinite stream they will not terminate, this is exactly the same as for an infinite Iterator as my example above. Could you give an example of what is non-intuitive and what is complicated (so we can try to address it)? Eventually all of this can be inside MaterializeSettings. However I think MaterializeSettings is the info about as create stream with akka. In difference StreamSettings is info only about the natural stream properties not linked to any library. You are of course free to implement your own version of Reactive Streams and provide your own combinators and settings properties which would, if following the spec, be completely interoperable with other implementations. Of course. That always true:) Reactive streams specification: ... Subscriber: - must not accept an onSubscribe event if it already has an active Subscription [7] TwoStreamInputProcessor So, Akka-streams exited out of the Reactive streams specifications in that area and cant anymore reference to it. Akka-streams need to extend specifications I think. Or remove join operations like zip, merge and other. There's nothing that says that an object X that has 2 Subscribers internally where the first one is connected to Publisher A and the seconds one to Publisher B, so I so no problem implementing zip, merge, concat or other. object X is TwoStreamInputProcessor, but why it named Processor?:) I think because since you need to implement Processor with ActorRef as impl. But subscribers in such case cant be holds in Processor and holds inside ..Processor extends Actor. But after this somebody forgot that anyway impl must satisfy Processor requirements - this is implementation of .org.reactivestreams.api.Processor (not extends this but anyway). And what do you think are you get if call ActorProcessor:getSubscriber for Zip node? First subscriber or second? And how you can get second? TwoStreamInputProcessor doing something what exits out of ReactiveStreams interface and ideology. Akka team doing big and excellent work, but if you already break ideology of Reactive Streams - you can do this and later, who cares?:) Akka Streams is highly experimental, both in terms of implementation and in terms of API. We are going to be spec and TCK compliant when Reactive Streams has an updated TCK (currently in process) and we're currently reworking the Akka Streams implementation as we speak. And, I have already presented an implementation that will preserve spec compliance as well as support join operations, so you can rest assured that Akka Streams are going to be RS compliant and also non-experimental in the future. If you can do TwoStreamInputProcessor, why you cant do consistent zip?:) I have still no idea what a consistent zip is or why it is useful, and have shown that the behaviour of zip is isomorphic to the one in the standard library. And ok. I will hope akka team fix this and extend Reactive Streams specs with MultiSubscriberProcessor and requirements for zip, merge etc. As I've explained above, this is not needed as it is expressible under the current (and hopefully future) revisions of the Reactive Streams specification. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives:
[akka-user] [akka-stream] Looking for artifact containing `akka.stream.testkit.StreamTestKit`
I feel like I should have been able to figure this out by myself already, but I'm swinging a miss and need some help. In what artifact does `akka.stream.testkit.StreamTestKit https://github.com/akka/akka/blob/akka-stream-and-http-experimental-0.4/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala` live? Is there a `akka-stream-experimental-test` artifact available (in 0.4+), or is this still considered an internal piece of testing code, and therefore not exported publically? Thanks! Simeon -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream] Looking for artifact containing `akka.stream.testkit.StreamTestKit`
Hi Simeon, Akka stream testkit is not published in any artifact yet. We will provide a testkit in a later version, but right now it is not good enough. /Patrik On Wed, Aug 13, 2014 at 6:24 PM, Simeon Fitch fi...@datamininglab.com wrote: I feel like I should have been able to figure this out by myself already, but I'm swinging a miss and need some help. In what artifact does `akka.stream.testkit.StreamTestKit https://github.com/akka/akka/blob/akka-stream-and-http-experimental-0.4/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala` live? Is there a `akka-stream-experimental-test` artifact available (in 0.4+), or is this still considered an internal piece of testing code, and therefore not exported publically? Thanks! Simeon -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Remoting Failures with Amazon EC2
Hi Ashvin, have you opened port 2552(for TCP Communication) on EC2 servers.By default this port is not open. On Wednesday, July 16, 2014 11:20:09 PM UTC+5:30, Ashvin Nair wrote: Hi everyone, I'm building a library with actors to do some large-scale data crunching. I'm running my code on Amazon EC2 spot instances using StarCluster. The program is unstable because the actor remoting sometimes drops: While the code is running, nodes will disconnect one by one in a few minutes. The nodes say something like: [ERROR] [07/16/2014 17:40:06.837] [slave-akka.actor.default-dispatcher-4] [akka://slave/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fslave%40master%3A2552-0/endpointWriter] AssociationError [akka.tcp://slave@node005:2552] - [akka.tcp://slave@master:2552]: Error [Association failed with [akka.tcp://slave@master:2552]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://slave@master:2552] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: master and [WARN] [07/16/2014 17:30:05.548] [slave-akka.actor.default-dispatcher-12] [Remoting] Tried to associate with unreachable remote address [akka.tcp://slave@master:2552]. Address is now quarantined, all messages to this address will be delivered to dead letters. Even though I can ping between the nodes just fine. I've been trying to fix this; I've figured it's some configuration setting. The Akka remoting documentation even says, However in cloud environments, such as Amazon EC2, the value could be increased to 12 in order to account for network issues that sometimes occur on such platforms. However, I've set that and beyond and still no luck in fixing the issue. Here are my current remoting configurations: akka { actor { provider = akka.remote.RemoteActorRefProvider } remote { enabled-transports = [akka.remote.netty.tcp] netty.tcp { port = 2552 # for modelling #send-buffer-size = 5000b #receive-buffer-size = 5000b #maximum-frame-size = 2500b send-buffer-size = 500b receive-buffer-size = 500b maximum-frame-size = 250b } watch-failure-detector.threshold = 100 acceptable-heartbeat-pause = 20s transport-failure-detector { heartbeat-interval = 4 s acceptable-heartbeat-pause = 20 s } } log-dead-letters = off } and I deploy my actors like so all from the master node: val o2m = system.actorOf(Props(classOf[IntOneToMany], p), name = o2m) val remote = Deploy(scope = RemoteScope(Address(akka.tcp, slave, args(i), 2552))) val b = system.actorOf(Props(classOf[IntBoss], o2m).withDeploy(remote), name = boss_ + i) etc. Can anyone point me to a mistake I'm making/how I can fix this problem and stop nodes from disconnecting? Alternatively, some solution of just re-launching the actors if they are disconnected also works; I don't care about dropped messages much. In fact I thought this was supposed to be easily configurable behavior but I'm finding it difficult to find the right place to look for that. Thank you very much, Ashvin -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Lost bytes with the akka stream TcpEcho example
Thanks everybody; indeed using 2.4-SNAPSHOT fixed it! On Wednesday, August 13, 2014 2:58:01 AM UTC-7, Patrik Nordwall wrote: That is probably https://github.com/akka/akka/issues/15572 /Patrik On Wed, Aug 13, 2014 at 11:50 AM, Endre Varga endre...@typesafe.com javascript: wrote: There is a bug in Akka Tcp which causes this. I dont remember the ticket No. but it is fixed and will be part of the next Akka bugfix release - then streams will be fixed at the next release. 2014.08.13. 7:37 ezt írta (William Le Ferrand warn...@gmail.com javascript:): Dear List, I've facing a puzzling issue with (a slightly modified version of) the example code TcpEcho.scala. I'm sending 6mb with the server to the client: - when I only count the bytes received, I get the right byte count back (6M) - when I do something as simple as allocating an array inside the operation applied to each bytestring, I start loosing bytes I reduced the issue to a minimal code example: https://gist.github.com/williamleferrand/77b4e40787b593d1f00d . If you run it, you'll see that you don't get exactly 6M bytes back. Does anyone would have a clue of why I'm loosing these bytes? Thanks in advance, Best, William -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream] Looking for artifact containing `akka.stream.testkit.StreamTestKit`
Thanks Patrik. Glad to know. On Wednesday, August 13, 2014 1:10:02 PM UTC-4, Patrik Nordwall wrote: Hi Simeon, Akka stream testkit is not published in any artifact yet. We will provide a testkit in a later version, but right now it is not good enough. /Patrik On Wed, Aug 13, 2014 at 6:24 PM, Simeon Fitch fi...@datamininglab.com javascript: wrote: I feel like I should have been able to figure this out by myself already, but I'm swinging a miss and need some help. In what artifact does `akka.stream.testkit.StreamTestKit https://github.com/akka/akka/blob/akka-stream-and-http-experimental-0.4/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala` live? Is there a `akka-stream-experimental-test` artifact available (in 0.4+), or is this still considered an internal piece of testing code, and therefore not exported publically? Thanks! Simeon -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] [akka-stream] What is the proper pattern for merging results from Duct[U] producing Future[T]s.
I have processing pipeline whereby a Duct terminates in a Future[T] type. I’d like to know the recommended approach for asynchronously merging the results from a number of Future[T]s, allowing the results to be passed downstream as they are available. To demonstrate what I want and how I’ve been attempting to solve this problem, here’s some test code: import akka.actor.ActorSystemimport akka.stream.scaladsl._import akka.stream.{FlowMaterializer, MaterializerSettings}import akka.testkit._import org.scalatest.FunSpecLike import scala.concurrent.Futureimport scala.concurrent.duration._class FutureMergeTest extends TestKit(ActorSystem()) with FunSpecLike with ImplicitSender { val materializer = FlowMaterializer(MaterializerSettings()) // Function that artificially inserts a delay in attempt to reorder results. val maybeDelay = (i: Int) ⇒ { if (i % 2 == 0) Thread.sleep(2000); i } implicit val ec = system.dispatcher describe(Flow producing futures) { it(should be able to feed futures to Flow after materialization) { val futureProducingDuct = Duct[Int] .map { i ⇒ Future {maybeDelay(i); i}} val futureResolvingDuct = Duct[Future[Int]] .mapFuture(identity) .map { i ⇒ println(Post future result: + i); i} val summationDuct = Duct[Int] .fold(Seq.empty[Int])(_ :+ _) val sendSelfResultDuct = Duct[Seq[Int]] .foreach(total ⇒ testActor ! total) val numRange = 1 to 10 val mainFlow = Flow(numRange.toIterator) .append(futureProducingDuct) .append(futureResolvingDuct) .append(summationDuct) .append(sendSelfResultDuct) note(materializing flow) mainFlow.consume(materializer) note(awaiting answer) val results = expectMsgType[Seq[Int]](30.seconds) assert(results.size == numRange.size) assert(results.sum == numRange.sum) // Asserts that the ordering is not the same // Assuming delay inserted in some arbitrary futures should allow others to come through // as they are available assert(results != numRange) note(ensuring single result) expectNoMsg(30.seconds) } } } The assumption here is that by inserting a delay for every input item that on the output side the ordering should differ from the input. If you run this, that is not the case: Post future result: 1 [[output pauses here]] Post future result: 2 Post future result: 3 Post future result: 4 Post future result: 5 Post future result: 6 Post future result: 7 Post future result: 8 Post future result: 9 13:55:34 INFO [default-akka.actor.default-dispatcher-5] a.a.RepointableActorRef - Message [akka.stream.impl.RequestMore] from Actor[akka://default/deadLetters] to Actor[akka://default/user/flow-1-1-map#477915073] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Post future result: 10 13:55:36 INFO [default-akka.actor.default-dispatcher-2] a.a.RepointableActorRef - Message [akka.stream.impl.RequestMore] from Actor[akka://default/deadLetters] to Actor[akka://default/user/flow-1-2-mapFuture#-1579107088] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 13:55:36 INFO [default-akka.actor.default-dispatcher-2] a.a.RepointableActorRef - Message [akka.stream.impl.RequestMore] from Actor[akka://default/deadLetters] to Actor[akka://default/user/flow-1-3-map#-1301380585] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) equaled Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) org.scalatest.exceptions.TestFailedException: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) equaled Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:498) ... Any recommendations on the proper way of doing this? Am I improperly using the mapFuture here? Or should I be instead writing custom ActorConsumer and sending it messages outside of the streams API? Or should I be using Future.onSuccess callbacks in some way? Thanks, Simeon -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving
[akka-user] Re: [akka-stream] What is the proper pattern for merging results from Duct[U] producing Future[T]s.
Another way of looking at my question might be this: How does one do zip, tee and merge like operations on Ducts and Flows before the materialization phase? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [akka-stream] Delayed retry
среда, 13 августа 2014 г., 19:58:50 UTC+4 пользователь √ написал: On Wed, Aug 13, 2014 at 4:32 PM, Evgeniy Ostapenko sml...@gmail.com javascript: wrote: I'm sorry but I think the standard library disagrees with you: scala val i = 0 to 9 i: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) scala val i2 = i filter (_ % 2 == 0) i2: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 2, 4, 6, 8) scala i zip i2 foreach println (0,0) (1,2) (2,4) (3,6) (4,8) Ok. But what do you think will be printed in my last more complicated example? Can you predict that (No)? Can you predict what will be printed in standard library (Yes)? Since you chose to use a non-deterministic combinator of course the result is going to be non-deterministic. You got exactly what you ordered: If you choose to call System.exit(1), the system will exit. Merge is non-deterministic. But zip deterministic - problem here is when you have deterministic zip, but one or both of the streams is non-consistent (after merge currently). Even though zip is deterministic you get non-consistent result stream. For example you have method which require two flow as parameters: def modifyFlow(flow1: Flow[Int], flow2: Flow[Int]): Flow[(Int, Int)] = { flow1 zip flow2.toProducer } problem: How you can know about consistent result or not from inside this method? In standard library you can require Seq for consistent or Set for inconsistent:) Why you reference to standard library? Akka-streams includes not only sequence abstraction, but also time abstraction. It is impossible and wrong to compare so different apis. Why so? You made a claim regarding intuitiveness, and I made an example where the same intuition as for the standard library holds, what's wrong with that? For example what mean merge in standard library. There is none. So there is nothing to expect. Of course none. Because libraries have quite different use cases. What about concat for two infinite streams in Akka-streams? What does the following do: scala Iterator from 1 res3: Iterator[Int] = non-empty iterator scala Iterator from 1 res4: Iterator[Int] = non-empty iterator scala res3 ++ res4 res5: Iterator[Int] = non-empty iterator res5.whatever fold or lastOption in case infinite stream? For an infinite stream they will not terminate, this is exactly the same as for an infinite Iterator as my example above. The same, but no exactly. First, a stream have no deterministic first element (oh my god, why not as in standard library:). Second: Akka infinite stream elements depends on time (want you this or not). In a standard library (excluding mutable package which produce concurrency problems) you have not such dependency. Imagine you works only with a mutable collections in high concurrent environment and without any locks. Yes, all methods as in standard library, but what results you achieved? The same thing with infinite streams without consistency control.. Could you give an example of what is non-intuitive and what is complicated (so we can try to address it)? Eventually all of this can be inside MaterializeSettings. However I think MaterializeSettings is the info about as create stream with akka. In difference StreamSettings is info only about the natural stream properties not linked to any library. You are of course free to implement your own version of Reactive Streams and provide your own combinators and settings properties which would, if following the spec, be completely interoperable with other implementations. Of course. That always true:) Reactive streams specification: ... Subscriber: - must not accept an onSubscribe event if it already has an active Subscription [7] TwoStreamInputProcessor So, Akka-streams exited out of the Reactive streams specifications in that area and cant anymore reference to it. Akka-streams need to extend specifications I think. Or remove join operations like zip, merge and other. There's nothing that says that an object X that has 2 Subscribers internally where the first one is connected to Publisher A and the seconds one to Publisher B, so I so no problem implementing zip, merge, concat or other. object X is TwoStreamInputProcessor, but why it named Processor?:) I think because since you need to implement Processor with ActorRef as impl. But subscribers in such case cant be holds in Processor and holds inside ..Processor extends Actor. But after this somebody forgot that anyway impl must satisfy Processor requirements - this is implementation of .org.reactivestreams.api.Processor (not extends this but anyway). And what do you think are you get if call ActorProcessor:getSubscriber for Zip node? First subscriber or second? And how you can get second? TwoStreamInputProcessor doing something what exits out of
[akka-user] Re: akka-persistence cassandra journal plugin api
CQL was extended under 2.0.x to include support for IF NOT EXIST keywords, these were not available in 1.2. The statements in the akka.persistence.cassandra.journal.CassandraStatements trait use this syntax to only create the keyspace and tables if they do not exist. No real workaround for this other than to change the trait or perhaps request the the statements be externalized. As Konrad mention, there clearly could be other issues once you get past this hurtle since there are some significant differences between 1.2 and 2.x. -Todd On Wednesday, August 13, 2014 11:42:00 AM UTC-4, ratika...@razorthink.net wrote: Hi, I am using the journal plugin for akka persistnce as listed here https://github.com/krasserm/akka-persistence-cassandra/, however I see its mentioned that it works only with cassandra 2.0.3 and higer and we have cassandra in our project configured but of version 1.2.13.2, does that mean we would be unable to use this? Post configuring the journal cassandra based plugin when I start our akka application I see below ( cassandra host that is configured has version less 2.0.3). Kindly help akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:596) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.4.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) ~[cassandra-driver-core-2.0.3.jar:na] at akka.persistence.cassandra.journal.CassandraJournal.init(CassandraJournal.scala:52) ~[akka-persistence-cassandra_2.10-0.3.3.jar:0.3.3] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_51] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_51] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_51] at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_51] at java.lang.Class.newInstance(Class.java:374) ~[na:1.7.0_51] at akka.util.Reflect$.instantiate(Reflect.scala:45) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.NoArgsReflectConstructor.produce(Props.scala:361) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.Props.newActor(Props.scala:252) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.newActor(ActorCell.scala:552) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:578) ~[akka-actor_2.10-2.3.4.jar:na] ... 9 common frames omitted com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.Responses$Error.asException(Responses.java:95) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235) ~[cassandra-driver-core-2.0.3.jar:na] at
[akka-user] Re: [akka-stream] What is the proper pattern for merging results from Duct[U] producing Future[T]s.
mapFuture will save order of elements ( it is Reactive streams specs requirement). You need custom producer or consumer. Something like this: val producer: ActorRef = ... Duct[Future[Int]].foreach(_.onSuccess(producer ! _)) среда, 13 августа 2014 г., 22:15:01 UTC+4 пользователь Simeon Fitch написал: Another way of looking at my question might be this: How does one do zip, tee and merge like operations on Ducts and Flows before the materialization phase? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Lost contact cluster client with heavy load
Hello all We have two clusters, cluster A and cluster B. Cluster A is a JMS consumer that uses clusterClient to send messages over to B for processing. During load testing, we did the equivalent of a cold start, which caused cluster A to lose contact with cluster B. At this point, many messages had been delivered and cluster A was awaiting responses back from cluster B with the results. 08-13-2014 13:38:06,603 ClusterRemoteWatcher.apply$mcV$sp WARN - Detected unreachable: [akka.tcp://clust...@blah.com:24896] 08-13-2014 13:38:06,635 ClusterClient.apply$mcV$sp INFO - Lost contact with [Actor[akka.tcp://clust...@blah.com:24896/user/receptionist#-1548308322]], restablishing connection 08-13-2014 13:38:06,793 Remoting.apply$mcV$sp WARN - Tried to associate with unreachable remote address [akka.tcp://clust...@blah.com:24896]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: The remote system has a UID that has been quarantined. Association aborted. 08-13-2014 13:38:06,874 ClusterClient.apply$mcV$sp INFO - Connected to [akka.tcp://clust...@blah.com:24896/user/receptionist] We then looked at cluster B's logs, it also has disassociated. 08/13/2014 13:38:06,763 WARN [ClusterB-akka.actor.default-dispatcher-5] Remoting - Tried to associate with unreachable remote address [akka.tcp://clust...@blah.com:23616]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted. My question is when reconnected, how do we get the responses from Command B to send back to Cluster A? It appears that it getting sent to deadletters is preventing the responses from ever getting sent back. Is there a way to get these responses sent back to the reconnected cluster? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka-persistence AtLeastOnceDelivery deliveryId generation
Hi, I'm using akka-persistence for a project and so far it's been a fairly pleasant experience. One thing I noticed was that when using AtLeastOnceDelivery, I must use the deliveryId's generated by the trait. In some cases it could be nice if I could provide my own semantically meaningful Val as deliveryId instead. Of course this would also be a great opportunity for me to shoot myself in the foot - I'd have to make sure myself that those id's are unique, even across recipients - but that's solvable and it could make for some more elegant messages. Is there any other reason this is not possible/allowed? (of course I could try and work around this by keeping track of a mapping between deliveryId's and my own meaningful id's, but that doesn't seem right at all :) ). Kind regards, Arnout -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: how to tune akka remoting performance
I noticed performance issues when using Akka IO to create tcp and udp servers (the application is a game server). High cpu usage and just overall poor performance. Being somewhat familiar with Netty I put in a plain netty version, and performance increased dramatically. I didn't take the time to find out exactly what was going on, as in my case Akka IO was just an extra layer anyways and going straight to Netty seemed like the right long term approach. Best way to debug this kind of thing in my experience is just load up VisualVM and start looking at the app under load. That coupled with a few thread dumps, and you should find the source of problem without too much effort. Chris -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Distributed Event Network design pattern?
Heya, I was wondering if someone on the list might be able to help me out. Trying to figure out what the best 'design pattern' would be to approach the following (and with my currently limited akka knowledge it'd bending my mind a little :p) : * A 'container' (actor system?). Initially there will be one, but this concept will likely expand to multiple disparate containers, not necessarily controlled by the same person (so secret cookie stuff wouldn't be useful between them) * Inside the container there are unique nodes (actors?). Each node needs to be addressable directly (routed from it's container?), but will handle entirely different things. * Ideally the nodes will be able to be communicated with directly (actor selection or akka http?) * For each node, there should be a guardian/receptionist as the single 'channel of contact'. This will consume the messages, check any required auth, and then send them into a protected internal system (second actor system running inside the guardian actor? or is there a better pattern?) I don't think there is a need for the internal 'protected' actors to reply directly (as they could just send a new fire/forget event to the guardian on the other node), but if they wanted to, how would that fit in with the 'guardian' pattern? Essentially i'm trying to conceptualise a distributed event network, where any node is capable of sending events to any other node, and each node can process those received events as it deems necessary (or ignore them if it so chooses) Also, does akka support a 'distributed' Pub/Sub (across disparate actor systems), or does it only work locally to a single actor system? As an aside, are there any good books/sites that have a bunch of different akka design patterns? (aside from the ones scattered through the docs/etc) Thanks heaps!! - Glenn / devalias -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] What would it take to make actors in another language/platform
Thanks all for the replies, I didn't see them until just now. I assumed I would get notified by groups when one came. At any rate, I am still quite interested in this as a possibility, for my purposes an arduino or propeller (paralax) solution would be much more inline with my goals, rather than a pi, mostly because the pi doesnt have a ton of io, where the application I am considering would need that. Also I am not intending to have the actors on the devices be standalone, they would need to communicate to a proper set of jvm based akka actors. I will look into other options but also I will likely look into the pi and see if I can make it fit my needs, it is nice that it can just run the jvm. On the topic of the pi what options would I have for communication, specifically would it be able to communicate over serial, or even better could I feed it my own class for communication? (likely going to be using xbee radios in api mode / doing zigbee mesh, really akka is ideal for this sort of situation just need to figure out how to get it working in my world :).) Thanks again. Josh On Monday, August 4, 2014 3:10:23 AM UTC-7, Konrad Malawski wrote: Hi Josh, We didn't yet publish/document our actor / cluster protocols very explicitly. We'd like to do this at some point, so perhaps other impls could talk to an akka cluster – no plans on when this might happen though. This plays into an interesting story about exposing actors as services – we're not there yet though. As for arduino's... I guess using plain C there would be a better option there? There is an Akka inspired C implementation libcppa around, but that's for in-memory AFAIR. If you'd go down the road of making them a cluster you'll have to implemen cluster memberships etc. (depends on use case again, but if they're in the field that's probably not a good idea if they're on GPRS etc). I'm not sure about your use-case, but perhaps using Raspberry Pi's would fit this better? They're able to run a JVM nicely and you could use Akka on them directly. -- Cheers, Konrad 'ktoso' Malawski hAkker @ Typesafe http://typesafe.com -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Distributed Event Network design pattern?
I'm not qualified to address your questions directly but I find a good place to start when looking for akka patterns is the typesafe activator. Here http://typesafe.com/activator/template/akka-clustering is one on a clustered subpub. might be interesting for you. On Wednesday, August 13, 2014 4:07:15 PM UTC-7, Glenn / devalias wrote: Heya, I was wondering if someone on the list might be able to help me out. Trying to figure out what the best 'design pattern' would be to approach the following (and with my currently limited akka knowledge it'd bending my mind a little :p) : * A 'container' (actor system?). Initially there will be one, but this concept will likely expand to multiple disparate containers, not necessarily controlled by the same person (so secret cookie stuff wouldn't be useful between them) * Inside the container there are unique nodes (actors?). Each node needs to be addressable directly (routed from it's container?), but will handle entirely different things. * Ideally the nodes will be able to be communicated with directly (actor selection or akka http?) * For each node, there should be a guardian/receptionist as the single 'channel of contact'. This will consume the messages, check any required auth, and then send them into a protected internal system (second actor system running inside the guardian actor? or is there a better pattern?) I don't think there is a need for the internal 'protected' actors to reply directly (as they could just send a new fire/forget event to the guardian on the other node), but if they wanted to, how would that fit in with the 'guardian' pattern? Essentially i'm trying to conceptualise a distributed event network, where any node is capable of sending events to any other node, and each node can process those received events as it deems necessary (or ignore them if it so chooses) Also, does akka support a 'distributed' Pub/Sub (across disparate actor systems), or does it only work locally to a single actor system? As an aside, are there any good books/sites that have a bunch of different akka design patterns? (aside from the ones scattered through the docs/etc) Thanks heaps!! - Glenn / devalias -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Cassandra Journal v2.0.3?
Hi Mathew, We also need to use the akka persistence journal plugin for older version of Cassandra v 1.2.x, hoever the plugin available works for version 2.0.3 or higher. Came across your post, did you happen to implement/tweak the journal for older version of Cassandra ? If yes would share it with us or let us know what were the tweaks required. Thanks for your help. --Ratika On Tuesday, May 6, 2014 12:51:25 AM UTC+5:30, Matthew Howard wrote: Has anyone implemented an akka persistence journal for older versions of Cassandra? I see the current journal is dependent on C* v2.0.3 or higher ( https://github.com/krasserm/akka-persistence-cassandra) but my app is currently on 1.1.9 and we are only actively planning to upgrade to v1.2 (just found this out - I thought we were moving to 2). I'm guessing there isn't one already out there, but thought I'd ask before attempting to implement one. Assuming I would need to implement it (probably a question for Martin directly) any warnings or recommendations? At first glance I'd obviously need to tweak the create keyspace/columnfamily commands (and change the driver), but I'm not seeing anything that appears to be too wildly dependent on C* v2.0.3 features. The handling of the partition_nr seems to be the biggest issue - I'm thinking we could just create the rowkey as a concatenation of the processor_id and partition_nr (e.g. myprocessor-0, myprocessor-1, etc... ). But I think/hope? otherwise the composite columns should work the same and I'm not going to get myself into a rabbit hole... Thanks in advance, Matt Howard -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka Persistence: LogProducer and LogConsumer?
I've been thinking about how Akka Persistence could possibly be improved for CQRS and *persistent* stream or log processing more generally. Here's my stream (no pun intended) of thought: Currently, PersistentActor is focussed on persisting state changes for an Actor (persistenceId) but why couldn't that be extended to just persisting a stream of events (or more generally messages) with the option to skip the reconstruction of state, i.e. PersistentStreamProducer. Similarly, PersistentView could be extended to consume a persistent stream of messages (keeping track of where it was up to, with the option to start again if needed), i.e. PersistentStreamConsumer. It should be possible to label these persistent streams with something other than a persistenceId (if appropriate), i.e. something like a topic. And finally, the most important for CQRS, PersistentStreamConsumer shouldn't need to be tied to a real PersistentStreamProducer. It should be possible to create (perhaps with a DSL in Scala or separately in the store itself, or elsewhere) virtual streams (projections) based on PersistentStreamProducer(s), PSP types, message types, etc. Greg Young's Event Store would seem to be a great store for facilitating most (if not all) of this, with its projections functionality. Kafka would seem to be an even more scalable solution but (AFAIK) currently lacks the ability to create virtual streams (i.e. virtual logs). I think this approach is somewhat different from the current Akka Streams, as far as I understand them, which seem to be run-time only streams. Perhaps these actors would be better named PersistentLogProducer and PersistentLogConsumer (or just LogProducer and LogConsumer). Or they could just be traits. In that case PersistentActor would be Actor with LogProducer and PersistentView would be Actor with LogConsumer. Perhaps this can (almost) already be done with PersistentActor and PersistentView? I'd be very interested to know if the current PersistentViews could somehow use virtual persistenceIds resulting from projections created in Event Store. I assume the current journals create topics for each PersistentActor persistenceId and the PersistenceViews just look them up. So if it was possible to create a projection in Event Store with a synthetic persistenceId then perhaps a PersistenceView could just find it, believing it to be a journal for a real PersistentActor. Your thoughts? I note there is already: 1. An Akka client for Event Store (EventStore.JVM) 2. An early Event Store plugin for Akka Persistence 3. An Akka consumer for Kafka (akka-kafka) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-persistence cassandra journal plugin api
On 13.08.14 17:42, ratika.pra...@razorthink.net wrote: Hi, I am using the journal plugin for akka persistnce as listed here https://github.com/krasserm/akka-persistence-cassandra/, however I see its mentioned that it works only with cassandra 2.0.3 and higer and we have cassandra in our project configured but of version 1.2.13.2, does that mean we would be unable to use this? Yes, in this case you won't be able to use the Cassandra journal plugin. It doesn't support Cassandra 1.2.x. Post configuring the journal cassandra based plugin when I start our akka application I see below ( cassandra host that is configured has version less 2.0.3). Kindly help akka.actor.ActorInitializationException: exception during creation at akka.actor.ActorInitializationException$.apply(Actor.scala:164) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:596) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.4.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.4.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) ~[cassandra-driver-core-2.0.3.jar:na] at akka.persistence.cassandra.journal.CassandraJournal.init(CassandraJournal.scala:52) ~[akka-persistence-cassandra_2.10-0.3.3.jar:0.3.3] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_51] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_51] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_51] at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_51] at java.lang.Class.newInstance(Class.java:374) ~[na:1.7.0_51] at akka.util.Reflect$.instantiate(Reflect.scala:45) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.NoArgsReflectConstructor.produce(Props.scala:361) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.Props.newActor(Props.scala:252) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.newActor(ActorCell.scala:552) ~[akka-actor_2.10-2.3.4.jar:na] at akka.actor.ActorCell.create(ActorCell.scala:578) ~[akka-actor_2.10-2.3.4.jar:na] ... 9 common frames omitted com.datastax.driver.core.exceptions.SyntaxError: line 2:29 missing '=' at 'EXISTS' at com.datastax.driver.core.Responses$Error.asException(Responses.java:95) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367) ~[cassandra-driver-core-2.0.3.jar:na] at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:582) ~[cassandra-driver-core-2.0.3.jar:na] at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na] at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na] at