Re: [akka-user] Akka Persistence - Views with multiple processors
On Sun, Apr 20, 2014 at 6:05 PM, Olger Warnier ol...@spectare.nl wrote: On Sunday, April 20, 2014 4:59:22 PM UTC+2, Patrik Nordwall wrote: On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier ol...@spectare.nl wrote: Hi Patrick, Sounds like an interesting approach, storing some meta-data at the view may help to check / show the reliability of the system. At this moment the events are sent to a processor per node that publishes the event (distributed pub sub) That sounds good, as well. When you talk about view, that's the akka-persistence view ? Yes, persistence.View and persistence.Processor So more or less, the sub processors could send messages to the View and when there is a Persist() around it, it will be stored. I'm not sure I understand what you mean here. Let me clarify my proposal with an example. Let's say we have a User aggregate root with some profile information that can be updated. The user is represented by a User EventsourcedProcessor actor, which is sharded. On the query side we want to be able to search users by first and last name, i.e. we want to store all users in a relational database table on the query side. Yup, great sample. The User actor persist FirstNameChanged, and inside the persist block it sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On the query side we have a AllUsersView connected to that processor. When AllUsersView receives FirstNameChanged it updates the db table. Indeed. In what way is the AllUsersView connected to that Processor ? (in a distributed pub sub situation) In a persistent Viewhttp://doc.akka.io/docs/akka/2.3.2/scala/persistence.html#Views you define the processorId that it will read the persistent messages from. It reads (replays) from the journal periodically, or when you send it a Update message. You can have many views connected to the same processor. The processor doesn't have to know anything about the views. In a distributed setup you will use a distributed/replicated journalhttp://akka.io/community/ and thereby the view can be located on another machine than the processor. (although, I have to understand in what way 'inside the persist block' is to be interpreted. Ah, I thought you were familiar with EventsourcedProcessor. Read up on it in the docs: http://doc.akka.io/docs/akka/2.3.2/scala/persistence.html#Event_sourcing http://doc.akka.io/docs/akka/2.3.2/scala/persistence.html#Event_sourcing To handle lost messages between User and AllUsers you might want to send an acknowledgement from AllUsers to User, and have a retry mechanism in User. I would implement that myself in User, but PersistentChannel might be an alternative. Is it possible todo persistent channels with the distributed pub sub stuff that's available in akka ? Yes, PersistentChannel requires a confirmation from the destination, so if you wrap/forward the ConfirmablePersistent and send it via pub-sub it should be fine. It will not work if you publish to multiple subscribers. That is the most straight forward solution. The drawback is that FirstNameChanged is stored twice. Therefore I suggested the meta-data alternative. User sends Persistent(UserChangedNotification(processorId))) to the AllUsers Processor. When AllUsersView receives UserChangedNotification it creates a child actor, a View for the processorId in the UserChangedNotification, if it doesn't already have such a child. That view would replay all events of the User and can update the database table. It must keep track of how far it has replayed/stored in db, i.e. seqNr must be stored in the db. The child View can be stopped when it becomes inactive. Will that work with a sharded cluster ? (and a 'View' may be running on another node) yes That alternative is more complicated, and I'm not sure it is worth it. From a solution perspective, using the distributed pub sub, maybe with persistent channels is what will do. Most of my questions have todo with using akka-persistence as a full fledged DDD framework, not too hard without the sharding (although a view for every aggregate root instance seems not to fit when you want to use that for database connectivity that contains a view model). with the sharding it is more complicated and a good structure to actually build a view that is on 'some' node listening for events, doing' it's thing is a handy part. Thanks for your thoughts. I'm sure patterns and tools around this will evolve from the experience of using akka persistence in real applications. Cheers, Patrik Cheers, Olger Cheers, Patrik Is that a correct understanding ? Kind regards, Olger On Sunday, April 20, 2014 2:32:07 PM UTC+2, Patrik Nordwall wrote: Hi Olger, What if you keep the sharded event sourced actors (+10k), but let them also send the events to one or a few processors. Then you can connect the views/streams to these processors. If you don't like storing the
Re: [akka-user] Re: [persistence] in memory journal for testing
Hi Tim, As Todd mentioned there is a community maintained in-memory journal, but that is not the same as the one you found in the akka sources. We have not promoted (documented) the akka in-memory journal because we are not sure that it is ready for general consumption and the maintenance obligations that comes with that. We use it in some tests of akka-persistence. Regards, Patrik On Sun, Apr 20, 2014 at 11:18 PM, Todd Nist tsind...@gmail.com wrote: I don't believe it is hidden, well not completely at least. There is a reference in the Akka documentationhttp://doc.akka.io/docs/akka/snapshot/scala/persistence.htmlto the Community Contribution http://akka.io/community/ journals provided, which is where the in-memory one is listed as well as the others. On Sunday, April 20, 2014 11:59:19 AM UTC-4, Tim Pigden wrote: I'm using 2.3 I eventually stumbled across this (more or less) akka.persistence.journal.plugin = akka.persistence.journal.inmem from the reference configuration file for snapshot 2.4 Is there any particular reason this is hidden away with no mention in the docs? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw JOIN US. REGISTER TODAY! http://www.scaladays.org/ Scala http://www.scaladays.org/ Days http://www.scaladays.org/ June 16th-18th, http://www.scaladays.org/ Berlin http://www.scaladays.org/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] can't get community in-memory-journal to work
Hi I can't get the in-memory-journal to configure. The documentation says simply use: akka.persistence.journal.plugin = in-memory-journal but this gives me an error: default-akka.actor.default-dispatcher-4 No configuration setting found for key 'in-memory-journal' akka.actor.ActorInitializationException: exception during creation It was in trying to resolve that that I found the hidden internal one which is intialised like this akka.persistence.journal.plugin = akka.persistence.journal.inmem I'm using Akka 2.3.0 by the way Any suggestions (other than to continue using the akka one for testing) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] 'Scatter-GatherAllDone' pattern questions
Hi, On Fri, Apr 18, 2014 at 10:55 AM, Alexey Shuksto seig...@gmail.com wrote: Hello fellow hAkkers, In our project we use same business logic several times: 1. Manager actor `M` receives work order `WorkOrder(n, time)`; 2. `M` distributes work over `n` worker actors `W` (`n` differs mostly); 3. After all work is 'done' or specified amount of `time` had passed, some entity must gather all 'done' work, reduce it and sent further down workflow; 4. By 'done' I mean that `W` could either do its work correctly, or fail internally, or just not do work in `time` -- all is perfectly normal. Right now it is done via third gatherer object `G`: 1. `G` is FSM with 2 states: `Wait` and `Gather`; 2. `G`s is instantiated in `M` context and every `M` actor holds a fixed size queue of `G` actors in `Wait` state (to prevent flood of work orders); 3. When distributing workload, `M` also sends work order to `G` and orders every `W` to forward results to `G`; 4. On receive of work order, `G` switches to `Gather` state `forMax` `time`; 5. After receiving every work result, `G` check if all necessary work was done, and if it is, sends itself `Done` message; 6. On `Done | StateTimeout` message, `G` reduces all received results, sends it further down, switches to `Wait` state and sends itself to `M`actor to be placed in queue. While this particular scheme works well enough, I was wondering -- maybe we overlooked more easy and straightforward one? That looks good. For example, we could implement `G` not as FSM, but as simple 'short-living' actor which destroys itself after reduce, Yes, that is how I would do it, and only do the pooling if I had evidence that it is needed for performance reasons. but this produces several questions: 1. What would be CPU, memory and GC overhead for continually create/destroy, say, 1k actors per second? Try it, measure! We have previously measured actor creation to 20 micro seconds (average of 10 actors). 1. Do `ActorContext.actorof(..)` blocks for entire actor creation time, or just for time enough to produce ActorRef for a future actor? the construction is async (constructor will potentially run in another thread) 1. What to do with (possibly high) number of `W` messages to `deadLetters` with results obtained after `time` had passed and corresponding `G` actor is stopped? well, if that is problem for some reason you can keep it alive for some extra time to absorb late messages, e.g. setting receiveTimeout /Patrik -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw JOIN US. REGISTER TODAY! http://www.scaladays.org/ Scala http://www.scaladays.org/ Days http://www.scaladays.org/ June 16th-18th, http://www.scaladays.org/ Berlin http://www.scaladays.org/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] can't get community in-memory-journal to work
doh! RTFM didn't read the bit about publishLocal and assumed intellij had gone and got it from a repo Thanks On 21 April 2014 09:16, Patrik Nordwall patrik.nordw...@gmail.com wrote: Have you added the dependency (jar in classpath)? Otherwise, raise an issue in the github repo of the external in-memory journal plugin. /Patrik 21 apr 2014 kl. 09:41 skrev Tim Pigden tim.pig...@optrak.com: Hi I can't get the in-memory-journal to configure. The documentation says simply use: akka.persistence.journal.plugin = in-memory-journal but this gives me an error: default-akka.actor.default-dispatcher-4 No configuration setting found for key 'in-memory-journal' akka.actor.ActorInitializationException: exception during creation It was in trying to resolve that that I found the hidden internal one which is intialised like this akka.persistence.journal.plugin = akka.persistence.journal.inmem I'm using Akka 2.3.0 by the way Any suggestions (other than to continue using the akka one for testing) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to a topic in the Google Groups Akka User List group. To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/TNZHwnlHKDU/unsubscribe. To unsubscribe from this group and all its topics, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Tim Pigden Optrak Distribution Software Limited +44 (0)1992 517100 http://www.linkedin.com/in/timpigden http://optrak.com Optrak Distribution Software Ltd is a limited company registered in England and Wales. Company Registration No. 2327613 Registered Offices: Suite 6,The Maltings, Hoe Lane, Ware, SG12 9LR England This email and any attachments to it may be confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily represent those of Optrak Distribution Software Ltd. If you are not the intended recipient of this email, you must neither take any action based upon its contents, nor copy or show it to anyone. Please contact the sender if you believe you have received this email in error. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] SingleConsumerOnlyUnboundedMailbox
Hi Aaron, withMailbox refers to a configuration section of the mailbox type, i.e. use this instead withMailbox(single-consumer-mailbox) and in your application.conf: single-consumer-mailbox { mailbox-type = akka.dispatch.SingleConsumerOnlyUnboundedMailbox } Cheers, Patrik On Fri, Apr 18, 2014 at 2:47 AM, Aaron Schlesinger arsch...@gmail.comwrote: Hello - I'm trying to spawn an actor that has the aforementioned mailbox. The docs seem clear that I can specify , but I'm getting a ConfigurationException on spawn. The code in the below gist fails with a ConfigurationException, and when I add mailbox-type = akka.dispatch.SingleConsumerOnlyUnboundedMailbox to my config, the property seems to get ignored (according to props.mailbox). Does anyone know what's going on here? https://gist.github.com/arschles/11018808. Thanks in advance, Aaron -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw JOIN US. REGISTER TODAY! http://www.scaladays.org/ Scala http://www.scaladays.org/ Days http://www.scaladays.org/ June 16th-18th, http://www.scaladays.org/ Berlin http://www.scaladays.org/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka streams infinite data source example
Hi all, I am very excited by akka streams -- it aims to solve a problem that I see time and time again. Every time I post to this list it feels like the solution is always wait until Akka Streams is released Finally, it is here! I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers. Is there any chance of an example along these lines? A month or so ago, I asked the same of the RxJava community and it turned out that it was a work-in-progess... so I created this little example comparing various approaches (I didn't write an Akka Actor implementation because it is quite obvious that it would just OOM): https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/scratch.scala The `ProducerObservableParser` reads in a CSV file one line at a time (the file is far too big to hold in memory), and then processes N rows in parallel, only reading more lines as the consumers finish each row. There is never more than a bounded number of rows in memory at any given point in time. The RxJava POC Observable is here https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/producers.scala But what is the equivalent Akka Streams code? The BasicTransformation example reads in the whole text before flowing it, and I couldn't see anything where the consuming was happening in parallel. Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka streams infinite data source example
Hi Sam, 21 apr 2014 kl. 12:28 skrev Sam Halliday sam.halli...@gmail.com: Hi all, I am very excited by akka streams -- it aims to solve a problem that I see time and time again. Every time I post to this list it feels like the solution is always wait until Akka Streams is released Finally, it is here! Yeah, exciting times. I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers. Isn't that demonstrated with the random number generator source, and its slow consumers? Is there any chance of an example along these lines? A month or so ago, I asked the same of the RxJava community and it turned out that it was a work-in-progess... so I created this little example comparing various approaches (I didn't write an Akka Actor implementation because it is quite obvious that it would just OOM): https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/scratch.scala The `ProducerObservableParser` reads in a CSV file one line at a time (the file is far too big to hold in memory), and then processes N rows in parallel, only reading more lines as the consumers finish each row. There is never more than a bounded number of rows in memory at any given point in time. That sounds very doable with akka streams. You can control the buffer sizes with the settings of the materializer. A consumer always signals upstream how many more elements it can handle, and the producer is not allowed to send more elements downstream than what was requested. The RxJava POC Observable is here https://github.com/fommil/rx-playground/blob/master/src/main/scala/com/github/fommil/rx/producers.scala But what is the equivalent Akka Streams code? The BasicTransformation example reads in the whole text before flowing it, and I couldn't see anything where the consuming was happening in parallel. BasicTransformation defines the input text in code (to make it simple), but the iterator next() is not called more than what can be consumed downstream. Isn't the log file sample more similar to your text file input? It does not read the whole file (if it was large) into memory. /Patrik Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Reactive / Akka streams of ByteStrings
20 apr 2014 kl. 00:32 skrev Daniel Armak danar...@gmail.com: Hi, How would I use reactive streams in general, and/or akka streams in particular, to represent a stream of binary data? I'm thinking of all binary streams that don't have a natural division into chunks of a fixed (small) size; you can divide them into chunks any way you want and all processors in the pipeline will still work. Like downloading a big file over HTTP, decompressing and saving it to disk. The natural representation is a stream of ByteString (or byte[] in Java). But each ByteString can be arbitrarily large. Can it? Wouldn't it be good if a ByteString producer has a chunk size limit as part of its contract? Then a chunk corresponds to 1 element. /Patrik It's no good to tell the producer I'm willing to accept one more element, if I have no idea what size it's going to be. Maybe the producer is reading from a 100MB ByteString it already has in memory, and the easiest thing for it to do (i.e. the easiest way for a programmer to code the producer) is to send all of the data as one element. What I really want is to tell it how many more bytes (or characters, etc) I'm willing to accept, but in the current reactive streams API that would require sending each byte in a separate call to onNext. Some of the implementations might address this scenario, but it seems to me that this will be a common use case and so standardization and interoperability would be of value. I'm sure this has all been thought of. What's the recommended usage pattern? - Also, some processors do have a natural chunk size (e.g. compression, encryption, network packet transmission). This size will rarely match the size of the incoming chunks (if only because different processors in a pipeline have different chunk sizes). To maintain efficiency it might be desirable for some processors to buffer their output and only forward chunks of discrete sizes. This, too, might benefit from typed declarations of the chunk sizes each processor prefers or requires as input. Otherwise the programmer will have to configure each processor manually to do the right amount of buffering, because the processor won't be automatically aware of the preferences of the next processor in the pipeline. This is a more complex scenario, so I expect it will be left to each implementation to introduce its own patterns. Daniel Armak -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka streams infinite data source example
On Monday, 21 April 2014 12:32:57 UTC+1, Patrik Nordwall wrote: I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers. Isn't that demonstrated with the random number generator source, and its slow consumers? I missed that one. How many consumers are there at any given moment? Is it in here somewhere? https://github.com/typesafehub/activator-akka-stream-scala/tree/master/src/main/scala/sample/stream My example is trying to simulate real world examples of: * parsing loads of data coming from a single data source (e.g. indexing a multi-TB database with Lucene, running in under 1GB) * parallel finite element calculations, where there are a lot more elements than bytes of RAM so they have to be batched (and with minimal object churn) BasicTransformation defines the input text in code (to make it simple), but the iterator next() is not called more than what can be consumed downstream. Isn't the log file sample more similar to your text file input? It does not read the whole file (if it was large) into memory. Right, so you pass an Iterator[String] to the flow. Yes, that looks good, sorry I missed it. But Iterator[T] is a little too ill-defined near the end of the stream (that's why I created my own Producer in the RxJava playground). For example, does it block on hasNext or on next if it knows there are more elements that are not yet available, or does it close up and go home? Traditional Java APIs (such as Queues) would actually return early if a queue was exhausted, instead of waiting for a poison pill. In any case, if Flow can handle an Iterator that blocks (e.g. for I/O), it's probably good enough for most situations. It would be even better if it knew how often to poll the data source... for example I have an EEG (brain scanner) library which has to poll the device at 57Hz. If it does it too quickly, there are inadequacies in the underlying hardware which result in busy spinning (yes, it's insane, and it really does eat the whole CPU)... but if I don't poll quickly enough then data can be lost. Relevant code (and my non-stream hack) here: https://github.com/fommil/emokit-java/blob/master/src/main/java/com/github/fommil/emokit/EmotivHid.java#L84 Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka streams infinite data source example
21 apr 2014 kl. 13:55 skrev Sam Halliday sam.halli...@gmail.com: On Monday, 21 April 2014 12:32:57 UTC+1, Patrik Nordwall wrote: I intend to read the documentation fully, but I was a little disappointed that the activator examples did not have a simple example with an (effectively) infinite data source that can only be polled in serial, with parallel (but controllably finite) consumers. Isn't that demonstrated with the random number generator source, and its slow consumers? I missed that one. How many consumers are there at any given moment? It has one consumer but two filter steps that can execute pipelined. You can attach several consumers with toProducer, and then start several flows from that. Backpressure works with multiple consumers also. Is it in here somewhere? https://github.com/typesafehub/activator-akka-stream-scala/tree/master/src/main/scala/sample/stream https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/WritePrimes.scala My example is trying to simulate real world examples of: * parsing loads of data coming from a single data source (e.g. indexing a multi-TB database with Lucene, running in under 1GB) * parallel finite element calculations, where there are a lot more elements than bytes of RAM so they have to be batched (and with minimal object churn) BasicTransformation defines the input text in code (to make it simple), but the iterator next() is not called more than what can be consumed downstream. Isn't the log file sample more similar to your text file input? It does not read the whole file (if it was large) into memory. Right, so you pass an Iterator[String] to the flow. Yes, that looks good, sorry I missed it. But Iterator[T] is a little too ill-defined near the end of the stream (that's why I created my own Producer in the RxJava playground). For example, does it block on hasNext or on next if it knows there are more elements that are not yet available, or does it close up and go home? Traditional Java APIs (such as Queues) would actually return early if a queue was exhausted, instead of waiting for a poison pill. In any case, if Flow can handle an Iterator that blocks (e.g. for I/O), it's probably good enough for most situations. Ah, I see what you mean. Blocking hasNext/next doesn't sound attractive to me. That should probably be another Producer, that can do the polling. /Patrik It would be even better if it knew how often to poll the data source... for example I have an EEG (brain scanner) library which has to poll the device at 57Hz. If it does it too quickly, there are inadequacies in the underlying hardware which result in busy spinning (yes, it's insane, and it really does eat the whole CPU)... but if I don't poll quickly enough then data can be lost. Relevant code (and my non-stream hack) here: https://github.com/fommil/emokit-java/blob/master/src/main/java/com/github/fommil/emokit/EmotivHid.java#L84 Best regards, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Reactive / Akka streams of ByteStrings
Hi Patrik, That can be done, but what bothers me is the potential lack of standardization. Every programmer will solve this problem all over again for each stream they build. Even if particular Reactive Streams implementations offer generic tools, they won't be compatible across implementations or languages, which is the whole point of Reactive Streams. Any processor in a stream that emits new ByteStrings needs to know the allowed or preferred min/max size to emit. E.g., an uncompressing processor that expands a 1kb input to a 1MB input may need to avoid sending the whole 1MB as a single ByteString (or you end up with zip bombs). The output size to use depends on the processor that comes after it (and not just on global per-stream settings). Some consumers can process any size input very quickly, so if you have a big ByteString in memory, you should send it over to free your own buffer. Others may need to buffer their input and this can consume a lot of extra memory - e.g. if the pipeline is based on byte arrays and splitting these means making copies. (ByteStrings mostly avoid copying, but what about non-Scala implementations that you need to work with, things like Netty?) Some processors might require exactly sized input chunks (e.g. encryption), and you may want to make sure the buffering/chunking is done once (in the producer) and not twice (in both producer and consumer) for efficiency. All of this argues for a way for byte stream producers/consumers to communicate directly about their preferences. Also, even if the programmer is going to specify the correct sizes manually for each producer, those producers still have to be configurable in this way. This is something I feel would benefit greatly from a standardized approach. Without it, I'm afraid that a lot of the idioms, examples, and library functions that will evolve around reactive streams won't be applicable to byte streams. Generic buffering/chunking idioms, like collecting up to X items, or buffering and batching them, need a 'flatMap'-like transformation to apply to byte streams. I think the way forward is to propose optional 'byte stream' interfaces extending the Producer/Consumer ones and see what sticks - but this has to wait for Akka Streams to mature. In particular, I feel akka-http will have to solve this problem somehow. E.g., if I write a spray server and I have an HttpResponsePart producer, how do I know what size ByteStrings to send to the consumer that ships these out to the network? Daniel On Mon, Apr 21, 2014 at 2:50 PM, Patrik Nordwall patrik.nordw...@gmail.comwrote: 20 apr 2014 kl. 00:32 skrev Daniel Armak danar...@gmail.com: Hi, How would I use reactive streams in general, and/or akka streams in particular, to represent a stream of binary data? I'm thinking of all binary streams that don't have a natural division into chunks of a fixed (small) size; you can divide them into chunks any way you want and all processors in the pipeline will still work. Like downloading a big file over HTTP, decompressing and saving it to disk. The natural representation is a stream of ByteString (or byte[] in Java). But each ByteString can be arbitrarily large. Can it? Wouldn't it be good if a ByteString producer has a chunk size limit as part of its contract? Then a chunk corresponds to 1 element. /Patrik It's no good to tell the producer I'm willing to accept one more element, if I have no idea what size it's going to be. Maybe the producer is reading from a 100MB ByteString it already has in memory, and the easiest thing for it to do (i.e. the easiest way for a programmer to code the producer) is to send all of the data as one element. What I really want is to tell it how many more bytes (or characters, etc) I'm willing to accept, but in the current reactive streams API that would require sending each byte in a separate call to onNext. Some of the implementations might address this scenario, but it seems to me that this will be a common use case and so standardization and interoperability would be of value. I'm sure this has all been thought of. What's the recommended usage pattern? - Also, some processors do have a natural chunk size (e.g. compression, encryption, network packet transmission). This size will rarely match the size of the incoming chunks (if only because different processors in a pipeline have different chunk sizes). To maintain efficiency it might be desirable for some processors to buffer their output and only forward chunks of discrete sizes. This, too, might benefit from typed declarations of the chunk sizes each processor prefers or requires as input. Otherwise the programmer will have to configure each processor manually to do the right amount of buffering, because the processor won't be automatically aware of the preferences of the next processor in the pipeline. This is a more complex scenario, so I expect it will be left to each implementation to
Re: [akka-user] Best way to communicate with an Actor?
If you start with the assumption that you have the ActorRefs for the X,Y,Z in someactor, I don't think you have a problem. Just tell the 3 actors directly. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] remote class loading
I am very new to Akka and still just playing around with it. I wonder if it is possible to use Akka to do dynamic class loading. The scenario is a server / client setup where the server is never rebooted. At compile time the server knows about a trait which I can pattern match against. This trait is implemented in a test case class which the server knows. I start the server and afterwards the client. When I send the test case class to an actor on the server everything works ok. But if I add another case class implementing the same trait and send that to the server (which don't have the class file for the new case class), then I get java.lang.ClassNotFoundException. This is not a big surprise - but I wonder if Akka makes it possible to send new class files to the server? clientcode: http://pastebin.com/avAdBBwg servercode with simple actor: http://pastebin.com/G0sxW31K Hopefully this question makes sense. Happy easter! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka-kafka consumer
Since there were a few kafka related questions here lately, figured I would factor this code out of some stuff I'm doing in case it's of use to others. https://github.com/sclasen/akka-kafka The consumer does explicit management of committing offsets, and allows actor based async processing of messages from kafka while keeping an upper bound on the maximum number of in-flight messages. Happy to accept PRs, issues, etc. version 0.0.2 is in maven central. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Akka 2.3.2 Released
Hello, Thanks for the new release. I am missing akka-testkit 2.3.2 for Scala 2.11 from maven central. Is this still forthcoming, or is akka-testkit replaced by some other module? Kind regards, Erik. Patrik Nordwall schreef op 09-04-14 15:16: /Dear hAkkers,/ We—the Akka committers—are pleased to be able to announce the availability of Akka 2.3.2. This is the second maintenance release of the 2.3 branch. It contains fixes for several issues. This release is backwards binary compatible with version 2.3.0 and 2.3.1 which means that the new JARs are a drop-in replacement for the old one (but not the other way around) as long as your build does not enable the inliner (Scala-only restriction). Always make sure to use at least the latest version required by any of your project’s dependencies. Migrating from Older Relases When migrating an existing project from Akka 2.2.x please have a look at our migration guide http://doc.akka.io/docs/akka/2.3.2/project/migration-guide-2.2.x-2.3.x.html. 2.3.2 compared to 2.3.1: * 5 tickets closed * 66 files changed, 637 insertions(+), 174 deletions(-) * … and a total of 2 committers! *Full list of fixed tickets:* * /3957/ EventsourcedProcessor must always reject external Persistent messages * /3967/ Second node is unable to join a cluster, regression in 2.3.1 * /3973/ joinSeedNodeProcess is not unique Exception at the Cluster Startup * /3974/ Some address information was lost in the serialization of persistent messages * /3975/ Check for invalid id extraction in Cluster Sharding Credits commits added removed 5 182 100 Patrik Nordwall 2 283 38 Björn Antonsson Additional Release Details The artifacts comprising this release have been published to https://oss.sonatype.org/content/repositories/releases/ and also to Maven Central. In addition, we adopted the sbt standard of encoding the Scala binary version in the artifact name, i.e. the core actor package’s artifactId is “akka-actor_2.10”. Website: * http://akka.io http://akka.io/ Maven repository: * http://repo.akka.io/releases Binary distribution: * http://akka.io/downloads/ Documentation: * http://doc.akka.io/docs/akka/2.3.2/ * http://doc.akka.io/api/akka/2.3.2/ * http://doc.akka.io/japi/akka/2.3.2/ Issue tracker: * http://www.assembla.com/spaces/akka/ Mailing list: * https://groups.google.com/group/akka-user Akka is released under the Apache V2 license. /Happy hAkking!/ -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw JOIN US. REGISTER TODAY! http://www.scaladays.org/ Scala http://www.scaladays.org/ Days http://www.scaladays.org/ June 16th-18th, http://www.scaladays.org/ Berlin http://www.scaladays.org/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com mailto:akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com mailto:akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Erik van Oosten http://www.day-to-day-stuff.blogspot.com/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.