Re: [akka-user] Trying to understand a sudden drop in throughput with Akka IO
Hi, On Tue, Dec 23, 2014 at 12:40 AM, Soumya Simanta soumya.sima...@gmail.com wrote: Endre, Thank you for taking the time to explain everything. It was really helpful not only in understanding the streams basics but also to create a better/faster version of what I'm trying to do. Before I go any further I want to say that I love Akka streams and it is going to be a useful API for a lot of my future work. Thanks to the Akka team. I tweaked both the dispatchers settings as well as the type of dispatcher used by default dispatcher. The program still ends up taking a good deal of my CPU (NOTE: The screenshot below is for FJP and not a ThreadpoolExecutor but I see similar usage with TPE). I wouldn't worry too much about CPU usage right now this can be an artifact of various scheduling effects (there is a pinned dispatcher, FJP can also distort measurements). You can try to use several parallel streams instead of one and see how things scale out horizontally. The memory footprint is always under control as excepted. I gave 12G of heap space to the JVM. The frequency of young generation GC depends on the MaterializerSettings buffer sizes. I've not tweaked the GC yet. Do you think that can make a difference ? Since more random elements (boxed integers) are kept in memory longer with higher buffers sizes, this is expected. In reality you would store real domain objects which are already allocated so that is less of an issue. BTW, does the a size of 64 mean that there will be 64 items in each buffer in the pipeline. I bumped it to 512 and saw an increase in throughput. I wouldn't go above 128. Here is the configuration and screenshots of one of the better runs I had. I'm not sure if I'm limited by TCP or how Rediscala is using Akka IO at this point. Any further insights will be very useful and appreciated. In the mean time I'll continue to play around with different values. I believe you maxed out the streams part, so any other bottleneck will be very likely in the Rediscala client or below. Your screenshot shows that around 70MByte/s is achieved which around 0,5Gbit/s. Assuming that TCP is used this is not bad at all. -Endre Thanks again ! My machine config is *Darwin 14.0.0 Darwin Kernel Version 14.0.0: Fri Sep 19 00:26:44 PDT 2014; root:xnu-2782.1.97~2/RELEASE_X86_64 x86_64* Processor Name: Intel Core i7 Processor Speed: 2.6 GHz Number of Processors: 1 Total Number of Cores: 4 L2 Cache (per Core): 256 KB L3 Cache: 6 MB Memory: 16 GB *application.conf * rediscala { rediscala-client-worker-dispatcher { mailbox-type = akka.dispatch.SingleConsumerOnlyUnboundedMailbox throughput = 1000 } } actor { default-dispatcher { type = Dispatcher executor = fork-join-executor default-executor { fallback = fork-join-executor } # This will be used if you have set executor = fork-join-executor fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 5 # Max number of threads to cap factor-based parallelism number to parallelism-max = 5 } throughput = 1000 } } I'm using the following for the FlowMaterializer val settings = MaterializerSettings(system) implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = *512*, initialInputBufferSize = *512*)) https://lh6.googleusercontent.com/-gLBJ7tgfRN4/VJipabIoLgI/vTw/9DdnDszQ55o/s1600/rediscala_network_IO_5Million_backpressure.png https://lh4.googleusercontent.com/-USKroaRYgco/VJipgfoSIOI/vT4/nUU-y9BCRXs/s1600/rediscala_network_IO_5Million_backpressure_threads.png https://lh6.googleusercontent.com/-w7_OA9C5f7k/VJipl0YW8-I/vUA/wpoDmD0F9xM/s1600/rediscala_network_IO_5Million_backpressure_cpu_memory.png On Monday, December 22, 2014 3:56:30 AM UTC-5, Akka Team wrote: Hi Soumya First of all, the performance of Akka IO (the original actor based one) might be slow or fast, but it does not degrade if writes are properly backpressured. Also it does not use Futures at all, so I guess this is an artifact of how you drive it. Now your first reactive-streams approach didn't work because the map stage that created an actor already let in the next element as soon as the actor was created. What you want is to let in the next element after the elements has been written. In other words you just created an ever growing number of actors without waiting for the write to complete. Your second solution is correct though because mapAsyncUnordered only lets in the next elements when the passed future completes -- which in your case corresponds to a finished write. As for the CPU usage, without the actual profile it doesn't say too much. For example if your ForkJoinPool (the default dispatcher) has not much work to do, it will spend a lot of time in its work stealing cycle (scan()) because
Re: [akka-user] Trying to understand a sudden drop in throughput with Akka IO
On Tue, Dec 23, 2014 at 4:02 AM, Soumya Simanta soumya.sima...@gmail.com wrote: Another akka-streams back pressure related question in context of the following piece of code. def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)) } val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) *val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)* My understanding is that the next request is send to Redis from the client only after a single Future is completed. Is this correct ? No, the number of allowed uncompleted Futures is defined by the buffer size. If there wouldn't be a parallelization between Futures then there would be no need for an ordered and unordered version of the same operation. Is there a way I can batch a bunch of set requests and wait for them to be over before I can send a new batch ? If there would be a version of set that accepts a Seq[] of writes, let's say batchSet then you could use: seqSource.grouped(100).mapAsyncUnordered(ks = redis.batchSet(...)) Where grouped makes maximum 100 sized groups from the stream of elements resulting in a stream of sequences. You need API support for that from the Redis client though. -Endre On Monday, December 22, 2014 6:40:30 PM UTC-5, Soumya Simanta wrote: Endre, Thank you for taking the time to explain everything. It was really helpful not only in understanding the streams basics but also to create a better/faster version of what I'm trying to do. Before I go any further I want to say that I love Akka streams and it is going to be a useful API for a lot of my future work. Thanks to the Akka team. I tweaked both the dispatchers settings as well as the type of dispatcher used by default dispatcher. The program still ends up taking a good deal of my CPU (NOTE: The screenshot below is for FJP and not a ThreadpoolExecutor but I see similar usage with TPE). The memory footprint is always under control as excepted. I gave 12G of heap space to the JVM. The frequency of young generation GC depends on the MaterializerSettings buffer sizes. I've not tweaked the GC yet. Do you think that can make a difference ? BTW, does the a size of 64 mean that there will be 64 items in each buffer in the pipeline. I bumped it to 512 and saw an increase in throughput. Here is the configuration and screenshots of one of the better runs I had. I'm not sure if I'm limited by TCP or how Rediscala is using Akka IO at this point. Any further insights will be very useful and appreciated. In the mean time I'll continue to play around with different values. Thanks again ! My machine config is *Darwin 14.0.0 Darwin Kernel Version 14.0.0: Fri Sep 19 00:26:44 PDT 2014; root:xnu-2782.1.97~2/RELEASE_X86_64 x86_64* Processor Name: Intel Core i7 Processor Speed: 2.6 GHz Number of Processors: 1 Total Number of Cores: 4 L2 Cache (per Core): 256 KB L3 Cache: 6 MB Memory: 16 GB *application.conf * rediscala { rediscala-client-worker-dispatcher { mailbox-type = akka.dispatch.SingleConsumerOnlyUnboundedMailbox throughput = 1000 } } actor { default-dispatcher { type = Dispatcher executor = fork-join-executor default-executor { fallback = fork-join-executor } # This will be used if you have set executor = fork-join-executor fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 5 # Max number of threads to cap factor-based parallelism number to parallelism-max = 5 } throughput = 1000 } } I'm using the following for the FlowMaterializer val settings = MaterializerSettings(system) implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = *512*, initialInputBufferSize = *512*)) https://lh6.googleusercontent.com/-gLBJ7tgfRN4/VJipabIoLgI/vTw/9DdnDszQ55o/s1600/rediscala_network_IO_5Million_backpressure.png https://lh4.googleusercontent.com/-USKroaRYgco/VJipgfoSIOI/vT4/nUU-y9BCRXs/s1600/rediscala_network_IO_5Million_backpressure_threads.png https://lh6.googleusercontent.com/-w7_OA9C5f7k/VJipl0YW8-I/vUA/wpoDmD0F9xM/s1600/rediscala_network_IO_5Million_backpressure_cpu_memory.png On Monday, December 22, 2014 3:56:30 AM UTC-5, Akka Team wrote: Hi Soumya First of all, the performance of Akka IO (the original actor based one) might be slow or fast, but it does not degrade if writes are properly backpressured. Also it does not use Futures at all, so I guess this is an artifact of how you drive it. Now your first reactive-streams approach didn't work because the map stage that created an actor already let in the next element as soon as the actor was created. What you want is to let in the next element after the elements has been written. In other words you just created an
Re: [akka-user] should cluster matrix should add akka-remote/network throughput
Hi, On Mon, Dec 22, 2014 at 7:12 PM, 何品 hepin1...@gmail.com wrote: should cluster matrix add node to node throughput or network interface throughput,so we could see how busy between nodes What do you mean by cluster matrix? -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-http (experimental) file server
Hi Allan, On Mon, Dec 22, 2014 at 9:03 PM, Allan Brighton allane...@gmail.com wrote: Is there an example http file server and client like this that works with the latest 1.0-M1 version? Wait a little more and you will see some http documentation coming (1.0-M2 is coming! Ho Ho Ho!). The API got simplified, so you no longer need to do IO(Http) ? X. I posted a simple example for using Http client with the new API already on this ML: https://groups.google.com/d/msg/akka-user/-mFoVZvK2wk/3i-sApqWDPIJ -Endre On Thursday, November 20, 2014 4:11:38 PM UTC+1, rklaehn wrote: On Thu, Nov 20, 2014 at 3:31 PM, Allan Brighton alla...@gmail.com wrote: Thanks for the http server example. It works fine. Do you also have a matching example for the client side? I found akka.http.TestClient, which is a start, but does not give any hints about reading the data from the server. Sure. Based on the TestClient example, here is one that reads data from the server and dumps it on the console. https://gist.github.com/rklaehn/3f26c3f80e5870831f52#file-client-example However, last time I tried this did not properly handle chunked encoding. But that was 0.10-M1. I will have to try again. Also, I hope there will be a less verbose way to do a simple GET request. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] should cluster matrix should add akka-remote/network throughput
means we may would want to see how busy between nodes.the message is send to remote actor via remote transport. 在 2014年12月23日星期二UTC+8下午5时26分52秒,Akka Team写道: Hi, On Mon, Dec 22, 2014 at 7:12 PM, 何品 hepi...@gmail.com javascript: wrote: should cluster matrix add node to node throughput or network interface throughput,so we could see how busy between nodes What do you mean by cluster matrix? -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] akka-persistence and routing
Hi, I have a system that receives messages and has to route them (based on some complicated logic) to a number of destinations reliably (with at-least-once semantics). It seems that the most natural way to achieve the routing part is to create a Router with custom RoutingLogic. The reliability part is easy to achieve using (akka-persistence 's) AtLeastOnceDelivery. However, I don't see how to integrate the two approaches together, and would love to hear if anyone has any ideas. So a little more details on what I'm looking for. I need an actor that, when it receives a message: - Applies some logic to the message to determine its destinations. - Sends the message to those destinations and await acknowledgements, retrying failed deliveries. - After collecting all the acknowledgements, acknowledge the original message. Obviously, I can create an AtLeastOnceDelivery actor and implement the routing and ack management myself, but I was wondering if there was a cleaner approach here. Thanks, Tal -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka Streams HTTP 1.0 MILESTONE 2
*Dear Hakkers,* We are happy to announce that the second milestone of the still brewing Akka Streams and Akka Http modules is now available. This milestone contains a new Streams and Http manual that smooths the path to start using these modules. While not yet fully complete there is already a wealth of information for the interested reader to consume. On top of the detailed topic pages – as an appetizer and dessert – we recommend taking a look at our new *Quickstart *and *Cookbook* pages. We must apologize to our Java friends since at this point only Scala documentation pages are available. Stay tuned for updates! Until then you can still look at the Scala part since the Java API closely matches. In addition to the documentation pages we sneaked in small improvements and bug fixes including: - Http client now does not do a half-close on the underlying TCP connection - No more ZIP bombs; Zipped streams are now extracted using completely bounded memory - ZipWith now has variants up to 22 parameters For the full list of small presents see the closed milestone on GitHub: https://github.com/akka/akka/issues?q=milestone%3Astreams-1.0-M2 https://github.com/akka/akka/milestones/streams-1.0-M2 We also want to give our thanks to all of the contributors who helped with this milestone: - 2beaucoup - Rich Dougherty (richdougherty) In the next cold weeks the Akka bear goes back to its cave to crawl out again in early January. Don't hesitate though to flood us with feedback, tickets or pull requests during this period. - *Akka Streams and Http Documentation (Scala)*: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala.html - *Akka GitHub Repository* *(remember, Streams and Http live on the release-2.3-dev branch)*: https://github.com/akka/akka/tree/release-2.3-dev - *Akka GitHub Issues:* https://github.com/akka/akka/issues With love, *- the Akka Team* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Streams HTTP 1.0 MILESTONE 2
Thanks for the improved documentation. On Tuesday, December 23, 2014 7:26:24 AM UTC-5, Akka Team wrote: *Dear Hakkers,* We are happy to announce that the second milestone of the still brewing Akka Streams and Akka Http modules is now available. This milestone contains a new Streams and Http manual that smooths the path to start using these modules. While not yet fully complete there is already a wealth of information for the interested reader to consume. On top of the detailed topic pages – as an appetizer and dessert – we recommend taking a look at our new *Quickstart *and *Cookbook* pages. We must apologize to our Java friends since at this point only Scala documentation pages are available. Stay tuned for updates! Until then you can still look at the Scala part since the Java API closely matches. In addition to the documentation pages we sneaked in small improvements and bug fixes including: - Http client now does not do a half-close on the underlying TCP connection - No more ZIP bombs; Zipped streams are now extracted using completely bounded memory - ZipWith now has variants up to 22 parameters For the full list of small presents see the closed milestone on GitHub: https://github.com/akka/akka/issues?q=milestone%3Astreams-1.0-M2 https://github.com/akka/akka/milestones/streams-1.0-M2 We also want to give our thanks to all of the contributors who helped with this milestone: - 2beaucoup - Rich Dougherty (richdougherty) In the next cold weeks the Akka bear goes back to its cave to crawl out again in early January. Don't hesitate though to flood us with feedback, tickets or pull requests during this period. - *Akka Streams and Http Documentation (Scala)*: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala.html - *Akka GitHub Repository* *(remember, Streams and Http live on the release-2.3-dev branch)*: https://github.com/akka/akka/tree/release-2.3-dev - *Akka GitHub Issues:* https://github.com/akka/akka/issues With love, *- the Akka Team* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Akka Streams HTTP 1.0 MILESTONE 2
Great News !!! Piyush Mishra *Blog* https://www.linkedin.com/in/piyush1989 | *LinkedIn https://www.linkedin.com/in/piyush1989* Skype : piyush.mishra275 Hangout :piyushmishra889 Mobile : +91-7506820534 On Tue, Dec 23, 2014 at 7:39 PM, Niels Hoogeveen nielshoog...@gmail.com wrote: Thanks for the improved documentation. On Tuesday, December 23, 2014 7:26:24 AM UTC-5, Akka Team wrote: *Dear Hakkers,* We are happy to announce that the second milestone of the still brewing Akka Streams and Akka Http modules is now available. This milestone contains a new Streams and Http manual that smooths the path to start using these modules. While not yet fully complete there is already a wealth of information for the interested reader to consume. On top of the detailed topic pages – as an appetizer and dessert – we recommend taking a look at our new *Quickstart *and *Cookbook* pages. We must apologize to our Java friends since at this point only Scala documentation pages are available. Stay tuned for updates! Until then you can still look at the Scala part since the Java API closely matches. In addition to the documentation pages we sneaked in small improvements and bug fixes including: - Http client now does not do a half-close on the underlying TCP connection - No more ZIP bombs; Zipped streams are now extracted using completely bounded memory - ZipWith now has variants up to 22 parameters For the full list of small presents see the closed milestone on GitHub: https://github.com/akka/akka/issues?q=milestone%3Astreams-1.0-M2 https://github.com/akka/akka/milestones/streams-1.0-M2 We also want to give our thanks to all of the contributors who helped with this milestone: - 2beaucoup - Rich Dougherty (richdougherty) In the next cold weeks the Akka bear goes back to its cave to crawl out again in early January. Don't hesitate though to flood us with feedback, tickets or pull requests during this period. - *Akka Streams and Http Documentation (Scala)*: http://doc.akka.io/docs/akka-stream-and-http-experimental/ 1.0-M2/scala.html http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala.html - *Akka GitHub Repository* *(remember, Streams and Http live on the release-2.3-dev branch)*: https://github.com/akka/akka/ tree/release-2.3-dev - *Akka GitHub Issues:* https://github.com/akka/akka/issues With love, *- the Akka Team* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-http HttpService equivalent
Thanks! Thought akka-http-java-experimental was java only APIs. Couldn't find even there in 1.0 M1 equivalents to e.g. spray's HttpServiceBase.runRoute so at least temporarily I had to revert back from akka-http to spray. Kind regards, Stevo Slavic. On Wed, Dec 10, 2014 at 2:54 PM, Björn Antonsson bjorn.antons...@typesafe.com wrote: Hi Stevo, There is no direct equivalent to the spray.routing.HttpService. The closest is probably the akka.http.server.japi.HttpService and akka.http.server.japi.HttpApp. B/ On 9 December 2014 at 16:04:57, Stevo Slavić (ssla...@gmail.com) wrote: Hello Akka community, In akka-http, is there an equivalent to spray.routing.HttpService? Kind regards, Stevo Slavic. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Björn Antonsson Typesafe http://typesafe.com/ – Reactive Apps on the JVM twitter: @bantonsson http://twitter.com/#!/bantonsson -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Streams HTTP 1.0 MILESTONE 2
very very nice work and words 在 2014年12月23日星期二UTC+8下午8时26分24秒,Akka Team写道: *Dear Hakkers,* We are happy to announce that the second milestone of the still brewing Akka Streams and Akka Http modules is now available. This milestone contains a new Streams and Http manual that smooths the path to start using these modules. While not yet fully complete there is already a wealth of information for the interested reader to consume. On top of the detailed topic pages – as an appetizer and dessert – we recommend taking a look at our new *Quickstart *and *Cookbook* pages. We must apologize to our Java friends since at this point only Scala documentation pages are available. Stay tuned for updates! Until then you can still look at the Scala part since the Java API closely matches. In addition to the documentation pages we sneaked in small improvements and bug fixes including: - Http client now does not do a half-close on the underlying TCP connection - No more ZIP bombs; Zipped streams are now extracted using completely bounded memory - ZipWith now has variants up to 22 parameters For the full list of small presents see the closed milestone on GitHub: https://github.com/akka/akka/issues?q=milestone%3Astreams-1.0-M2 https://github.com/akka/akka/milestones/streams-1.0-M2 We also want to give our thanks to all of the contributors who helped with this milestone: - 2beaucoup - Rich Dougherty (richdougherty) In the next cold weeks the Akka bear goes back to its cave to crawl out again in early January. Don't hesitate though to flood us with feedback, tickets or pull requests during this period. - *Akka Streams and Http Documentation (Scala)*: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala.html - *Akka GitHub Repository* *(remember, Streams and Http live on the release-2.3-dev branch)*: https://github.com/akka/akka/tree/release-2.3-dev - *Akka GitHub Issues:* https://github.com/akka/akka/issues With love, *- the Akka Team* -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Broadcast messages to an Akka Sharded Cluster
Hi, I'm just in the early stages of migrating a simulation that uses the old Scala Actors to use Akka instead. The application generates a very large pool of stateful actors that inter-communicate with each other. The functionality in the cluster sharding example (the one the creates blog posts) is quite close to what I need. What I haven't discovered yet is an obvious way to let one actor in the cluster broadcast a message to all of the other actors in the cluster (or a large subset of the cluster) for affinity propagation. I tried a simplistic approach of just sending to a masked path, but that only sent the message to the actors that were on the same node and not the entire cluster. context.actorSelection(/user/sharding/MyPool/*) ! Notify(*Hi, I was just added*) It seems like I should be sending the message to the shard region instead, but the examples I'm finding only show using an IdExtractor to locate a specific actor in the cluster and not to the entire cluster. The secondary shard region in the blog example stores a subset of the cluster (the author listings) is basically the tactic I use in the original simulation (an external mutable map of actor references). Is there a recommend way to implement broadcasting? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Broadcast messages to an Akka Sharded Cluster
i think distributed pub-sub is what you need: http://doc.akka.io/docs/akka/2.3.8/contrib/distributed-pub-sub.html (somewhat hidden under 'External Contributions'). -Michael On 12/23/14 08:33, Terry Heinze wrote: Hi, I'm just in the early stages of migrating a simulation that uses the old Scala Actors to use Akka instead. The application generates a very large pool of stateful actors that inter-communicate with each other. The functionality in the cluster sharding example (the one the creates blog posts) is quite close to what I need. What I haven't discovered yet is an obvious way to let one actor in the cluster broadcast a message to all of the other actors in the cluster (or a large subset of the cluster) for affinity propagation. I tried a simplistic approach of just sending to a masked path, but that only sent the message to the actors that were on the same node and not the entire cluster. context.actorSelection(/user/sharding/MyPool/*) ! Notify(*Hi, I was just added*) It seems like I should be sending the message to the shard region instead, but the examples I'm finding only show using an IdExtractor to locate a specific actor in the cluster and not to the entire cluster. The secondary shard region in the blog example stores a subset of the cluster (the author listings) is basically the tactic I use in the original simulation (an external mutable map of actor references). Is there a recommend way to implement broadcasting? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com mailto:akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com mailto:akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Exception when persisting the ShardCoordinator
Hi all, I'm having a problem with akka-persistence and akka-sharding. Every now and then, sometimes after running our app for a long time I get this error: DELETE FROM USERSIS.snapshot WHERE persistence_id = '/user/sharding/TokenRouterCoordinator/singleton/coordinator' AND sequence_nr = 2 [ERROR] [12/18/2014 13:51:28.826] [TokenCluster-akka.actor.default- dispatcher-16] [akka://TokenCluster/system/snapshot-store] No more data to read from socket java.sql.SQLRecoverableException: No more data to read from socket at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1157) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:350) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531) at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement. java:208) at oracle.jdbc.driver.T4CPreparedStatement.executeForRows( T4CPreparedStatement.java:1046) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout( OracleStatement.java:1336) at oracle.jdbc.driver.OraclePreparedStatement.executeInternal( OraclePreparedStatement.java:3613) at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate( OraclePreparedStatement.java:3694) at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate( OraclePreparedStatementWrapper.java:1354) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate( DelegatingPreparedStatement.java:105) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate( DelegatingPreparedStatement.java:105) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply$mcI$sp( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$NakedExecutor.apply(StatementExecutor. scala:33) at scalikejdbc.StatementExecutor$$anon$1. scalikejdbc$StatementExecutor$LoggingSQLAndTiming$$super$apply( StatementExecutor.scala:317) at scalikejdbc.StatementExecutor$LoggingSQLAndTiming$class.apply( StatementExecutor.scala:264) at scalikejdbc.StatementExecutor$$anon$1. scalikejdbc$StatementExecutor$LoggingSQLIfFailed$$super$apply( StatementExecutor.scala:317) at scalikejdbc.StatementExecutor$LoggingSQLIfFailed$class.apply( StatementExecutor.scala:295) at scalikejdbc.StatementExecutor$$anon$1.apply(StatementExecutor.scala: 317) at scalikejdbc.StatementExecutor.executeUpdate(StatementExecutor.scala: 337) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply(DBSession. scala:352) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply(DBSession. scala:350) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.ActiveSession.using(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:349) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:327) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.DBConnection$class.autoCommit(DBConnection.scala:183) at scalikejdbc.DB.autoCommit(DB.scala:75) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:218) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:217) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.DB$.using(DB.scala:150) at scalikejdbc.DB$.autoCommit(DB.scala:217) at scalikejdbc.SQLUpdate.apply(SQL.scala:486) at akka.persistence.jdbc.snapshot.GenericStatements$class.deleteSnapshot (Statements.scala:30) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore.deleteSnapshot (SnapshotStores.scala:16) at akka.persistence.jdbc.snapshot.JdbcSyncSnapshotStore$class.delete( JdbcSyncSnapshotStore.scala:34) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore.delete( SnapshotStores.scala:16) at akka.persistence.snapshot.SnapshotStore$$anonfun$receive$1. applyOrElse(SnapshotStore.scala:44) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore.aroundReceive( SnapshotStores.scala:16) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:617) at java.lang.Thread.run(Thread.java:745) So it looks like the persistence of the shard coordinators is failing. We are
[akka-user] Future returned by iterating group completes before group flows are fully processed
In the Activate example for groupBy, we see this: Source(() = logFile.getLines()). // group them by log level groupBy { case LoglevelPattern(level) = level case other = OTHER }. // write lines of each group to a separate file foreach { case (level, groupFlow) = val output = new PrintWriter(new FileOutputStream(starget/log-$level.txt), true) // close resource when the group stream is completed // foreach returns a future that we can key the close() off of groupFlow.foreach(line = output.println(line)).onComplete(_ = Try(output.close())) }. onComplete { _ = Try(logFile.close()) system.shutdown() } I have been following this pattern, but am finding that the Future returned by the foreach on the groupBy completes BEFORE the groupFlows are finished processing. In effect, this leaves several messages yet to be processed. I've observed this issue in akka-stream-experimental M1 and M2. I have created the following gist which demonstrates the problem: https://gist.github.com/timcharper/9824eee567f24b4205f3 Is the Activator example a bad example of how to use streams? Or is akka streams misbehaving? (is it impossible for a group stream to know that it's groupFlows are completed)? Thanks, Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Future returned by iterating group completes before group flows are fully processed
Sorry, I should mention the Activator example to which I am referring is this one: https://github.com/typesafehub/activator-akka-stream-scala/blob/21ac90bead7917bfb0ebb0a0858e5ec60dce966c/src/main/scala/sample/stream/GroupLogFile.scala On Tuesday, December 23, 2014 12:24:21 PM UTC-7, Tim Harper wrote: In the Activate example for groupBy, we see this: Source(() = logFile.getLines()). // group them by log level groupBy { case LoglevelPattern(level) = level case other = OTHER }. // write lines of each group to a separate file foreach { case (level, groupFlow) = val output = new PrintWriter(new FileOutputStream(s target/log-$level.txt), true) // close resource when the group stream is completed // foreach returns a future that we can key the close() off of groupFlow.foreach(line = output.println(line)).onComplete(_ = Try (output.close())) }. onComplete { _ = Try(logFile.close()) system.shutdown() } I have been following this pattern, but am finding that the Future returned by the foreach on the groupBy completes BEFORE the groupFlows are finished processing. In effect, this leaves several messages yet to be processed. I've observed this issue in akka-stream-experimental M1 and M2. I have created the following gist which demonstrates the problem: https://gist.github.com/timcharper/9824eee567f24b4205f3 Is the Activator example a bad example of how to use streams? Or is akka streams misbehaving? (is it impossible for a group stream to know that it's groupFlows are completed)? Thanks, Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Exception when persisting the ShardCoordinator
Hi Miguel, This seems to be a journal issue. You should contact the maintainer of the JDBC journal. -Endre On Tue, Dec 23, 2014 at 7:43 PM, Miguel Vilá miguelv...@seven4n.com wrote: Hi all, I'm having a problem with akka-persistence and akka-sharding. Every now and then, sometimes after running our app for a long time I get this error: DELETE FROM USERSIS.snapshot WHERE persistence_id = '/user/sharding/TokenRouterCoordinator/singleton/coordinator' AND sequence_nr = 2 [ERROR] [12/18/2014 13:51:28.826] [TokenCluster-akka.actor.default- dispatcher-16] [akka://TokenCluster/system/snapshot-store] No more data to read from socket java.sql.SQLRecoverableException: No more data to read from socket at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1157 ) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:350) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531) at oracle.jdbc.driver.T4CPreparedStatement.doOall8( T4CPreparedStatement.java:208) at oracle.jdbc.driver.T4CPreparedStatement.executeForRows( T4CPreparedStatement.java:1046) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout( OracleStatement.java:1336) at oracle.jdbc.driver.OraclePreparedStatement.executeInternal( OraclePreparedStatement.java:3613) at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate( OraclePreparedStatement.java:3694) at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate( OraclePreparedStatementWrapper.java:1354) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate( DelegatingPreparedStatement.java:105) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate( DelegatingPreparedStatement.java:105) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply$mcI$sp (StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$NakedExecutor.apply(StatementExecutor .scala:33) at scalikejdbc.StatementExecutor$$anon$1. scalikejdbc$StatementExecutor$LoggingSQLAndTiming$$super$apply( StatementExecutor.scala:317) at scalikejdbc.StatementExecutor$LoggingSQLAndTiming$class.apply( StatementExecutor.scala:264) at scalikejdbc.StatementExecutor$$anon$1. scalikejdbc$StatementExecutor$LoggingSQLIfFailed$$super$apply( StatementExecutor.scala:317) at scalikejdbc.StatementExecutor$LoggingSQLIfFailed$class.apply( StatementExecutor.scala:295) at scalikejdbc.StatementExecutor$$anon$1.apply(StatementExecutor.scala :317) at scalikejdbc.StatementExecutor.executeUpdate(StatementExecutor.scala :337) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply(DBSession. scala:352) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply(DBSession. scala:350) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.ActiveSession.using(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:349) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:327) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.DBConnection$class.autoCommit(DBConnection.scala:183) at scalikejdbc.DB.autoCommit(DB.scala:75) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:218) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:217) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.DB$.using(DB.scala:150) at scalikejdbc.DB$.autoCommit(DB.scala:217) at scalikejdbc.SQLUpdate.apply(SQL.scala:486) at akka.persistence.jdbc.snapshot.GenericStatements$class. deleteSnapshot(Statements.scala:30) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore. deleteSnapshot(SnapshotStores.scala:16) at akka.persistence.jdbc.snapshot.JdbcSyncSnapshotStore$class.delete( JdbcSyncSnapshotStore.scala:34) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore.delete( SnapshotStores.scala:16) at akka.persistence.snapshot.SnapshotStore$$anonfun$receive$1. applyOrElse(SnapshotStore.scala:44) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore. aroundReceive(SnapshotStores.scala:16) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at
Re: [akka-user] Future returned by iterating group completes before group flows are fully processed
Hi Tim, I have been following this pattern, but am finding that the Future returned by the foreach on the groupBy completes BEFORE the groupFlows are finished processing. This is completely normal. The master stream finishes as soon as there are no more gourps, it has nothing to do with group streams being finished (or at least there is a possible race). In fact every stream finishes as soon as the last element has been processed. In this case the last element is the last group, and processing finishes from the viewpoint of that stream as soon as you finished calling foreach on the substream (which is usually earlier when that foreach itself finishes). Is the Activator example a bad example of how to use streams? It is a faulty example it seems, it demonstrates streams properly, but it demonstrates shutdown order wrongly: shutting down the system happens too early -- it should happen when all writing streams are finished, not when all writing streams has been started. I opened a ticket: https://github.com/typesafehub/activator-akka-stream-scala/issues/18 Or is akka streams misbehaving? No, it is not. (is it impossible for a group stream to know that it's groupFlows are completed)? Yes, there is a cookbook sample that is similar: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Implementing_reduce-by-key Although that recipe is called reduce-by-key it follows a similar pattern. Modification of the activator sample is simple. replace: groupFlow.foreach(line = output.println(line)).onComplete(_ = Try (output.close())) with (pseudocode, won't compile) groupFlow.foreach(line = output.println(line)).andThen(_ = Try (output.close())) This change causes the above statement to return a Future[Unit] instead of just Unit. Now on the master stream use .mapAsyncUnordered(identity).onComplete(close input file) These changes basically do the following: The first change makes the stream of groups become a stream of completion Futures (Future[Unit]), while the second modification causes the stream to flatten these futures by waiting for them (Unordered since we don't care about the order). Now when this new stream finishes, it also means that all Futures has been finished, which means that all output.close() has been called (because of the andThen combinator on the Future), which means that all foreach blocks on the substreams has been finished. This is exactly what you want. Just like it is explained in the recipe you actually want to call: .buffer(MaxGroups, OverflowStrategy.error).mapAsyncUnordered(identity).onComplete(close input file) With MaxGroups being large enough to contain all possible groups. The reason for this is explained in the recipe. -Endre Thanks, Tim -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Future returned by iterating group completes before group flows are fully processed
Thank you for the quick response, Andre! After thinking through The Meat slide in Dr. Roland Kuhn's fantastic presentation, I was thinking, just how would the onComplete method for the groupBy know that the individual groupBy flows were complete? So yes, it makes perfect sense, logically, why this behavior is occurring. I've commented the ticket https://github.com/typesafehub/activator-akka-stream-scala/issues/18 https://github.com/typesafehub/activator-akka-stream-scala/issues/18 so perhaps further discussion of mapAsync vs folding futures can be done there. Tim On Dec 23, 2014, at 13:03, Akka Team akka.offic...@gmail.com wrote: Hi Tim, I have been following this pattern, but am finding that the Future returned by the foreach on the groupBy completes BEFORE the groupFlows are finished processing. This is completely normal. The master stream finishes as soon as there are no more gourps, it has nothing to do with group streams being finished (or at least there is a possible race). In fact every stream finishes as soon as the last element has been processed. In this case the last element is the last group, and processing finishes from the viewpoint of that stream as soon as you finished calling foreach on the substream (which is usually earlier when that foreach itself finishes). Is the Activator example a bad example of how to use streams? It is a faulty example it seems, it demonstrates streams properly, but it demonstrates shutdown order wrongly: shutting down the system happens too early -- it should happen when all writing streams are finished, not when all writing streams has been started. I opened a ticket: https://github.com/typesafehub/activator-akka-stream-scala/issues/18 https://github.com/typesafehub/activator-akka-stream-scala/issues/18 Or is akka streams misbehaving? No, it is not. (is it impossible for a group stream to know that it's groupFlows are completed)? Yes, there is a cookbook sample that is similar: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Implementing_reduce-by-key http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html#Implementing_reduce-by-key Although that recipe is called reduce-by-key it follows a similar pattern. Modification of the activator sample is simple. replace: groupFlow.foreach(line = output.println(line)).onComplete(_ = Try(output.close())) with (pseudocode, won't compile) groupFlow.foreach(line = output.println(line)).andThen(_ = Try(output.close())) This change causes the above statement to return a Future[Unit] instead of just Unit. Now on the master stream use .mapAsyncUnordered(identity).onComplete(close input file) These changes basically do the following: The first change makes the stream of groups become a stream of completion Futures (Future[Unit]), while the second modification causes the stream to flatten these futures by waiting for them (Unordered since we don't care about the order). Now when this new stream finishes, it also means that all Futures has been finished, which means that all output.close() has been called (because of the andThen combinator on the Future), which means that all foreach blocks on the substreams has been finished. This is exactly what you want. Just like it is explained in the recipe you actually want to call: .buffer(MaxGroups, OverflowStrategy.error).mapAsyncUnordered(identity).onComplete(close input file) With MaxGroups being large enough to contain all possible groups. The reason for this is explained in the recipe. -Endre -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Trying to understand a sudden drop in throughput with Akka IO
Endre, thank you again. I think you are correct. It looks like the primary limitation is around not being able to batch more operations in one network call (TCP). I increased the message size (10 times) and I'm able to send more bytes per second. At some point I'll hit the network limit. The following is for 1 million messages of around 10K each. https://lh3.googleusercontent.com/-dQTzB-qMZwI/VJndn-gyogI/vaI/H02pACUqdlE/s1600/rediscala_network_IO_1Million_10kmsgsize.png Can you explain a little more why you won't recommend going any higher than 128 for the buffer size of FlowMaterializer? Also, is there a way I can measure the actual latency distribution while using the akka-streams? Something like HDRHistrogram of the all the requests. Thanks -Soumya On Tuesday, December 23, 2014 4:05:58 AM UTC-5, Akka Team wrote: Hi, On Tue, Dec 23, 2014 at 12:40 AM, Soumya Simanta soumya@gmail.com javascript: wrote: Endre, Thank you for taking the time to explain everything. It was really helpful not only in understanding the streams basics but also to create a better/faster version of what I'm trying to do. Before I go any further I want to say that I love Akka streams and it is going to be a useful API for a lot of my future work. Thanks to the Akka team. I tweaked both the dispatchers settings as well as the type of dispatcher used by default dispatcher. The program still ends up taking a good deal of my CPU (NOTE: The screenshot below is for FJP and not a ThreadpoolExecutor but I see similar usage with TPE). I wouldn't worry too much about CPU usage right now this can be an artifact of various scheduling effects (there is a pinned dispatcher, FJP can also distort measurements). You can try to use several parallel streams instead of one and see how things scale out horizontally. The memory footprint is always under control as excepted. I gave 12G of heap space to the JVM. The frequency of young generation GC depends on the MaterializerSettings buffer sizes. I've not tweaked the GC yet. Do you think that can make a difference ? Since more random elements (boxed integers) are kept in memory longer with higher buffers sizes, this is expected. In reality you would store real domain objects which are already allocated so that is less of an issue. BTW, does the a size of 64 mean that there will be 64 items in each buffer in the pipeline. I bumped it to 512 and saw an increase in throughput. I wouldn't go above 128. Here is the configuration and screenshots of one of the better runs I had. I'm not sure if I'm limited by TCP or how Rediscala is using Akka IO at this point. Any further insights will be very useful and appreciated. In the mean time I'll continue to play around with different values. I believe you maxed out the streams part, so any other bottleneck will be very likely in the Rediscala client or below. Your screenshot shows that around 70MByte/s is achieved which around 0,5Gbit/s. Assuming that TCP is used this is not bad at all. -Endre Thanks again ! My machine config is *Darwin 14.0.0 Darwin Kernel Version 14.0.0: Fri Sep 19 00:26:44 PDT 2014; root:xnu-2782.1.97~2/RELEASE_X86_64 x86_64* Processor Name: Intel Core i7 Processor Speed: 2.6 GHz Number of Processors: 1 Total Number of Cores: 4 L2 Cache (per Core): 256 KB L3 Cache: 6 MB Memory: 16 GB *application.conf * rediscala { rediscala-client-worker-dispatcher { mailbox-type = akka.dispatch.SingleConsumerOnlyUnboundedMailbox throughput = 1000 } } actor { default-dispatcher { type = Dispatcher executor = fork-join-executor default-executor { fallback = fork-join-executor } # This will be used if you have set executor = fork-join-executor fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 5 # Max number of threads to cap factor-based parallelism number to parallelism-max = 5 } throughput = 1000 } } I'm using the following for the FlowMaterializer val settings = MaterializerSettings(system) implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = *512*, initialInputBufferSize = *512*)) https://lh6.googleusercontent.com/-gLBJ7tgfRN4/VJipabIoLgI/vTw/9DdnDszQ55o/s1600/rediscala_network_IO_5Million_backpressure.png https://lh4.googleusercontent.com/-USKroaRYgco/VJipgfoSIOI/vT4/nUU-y9BCRXs/s1600/rediscala_network_IO_5Million_backpressure_threads.png https://lh6.googleusercontent.com/-w7_OA9C5f7k/VJipl0YW8-I/vUA/wpoDmD0F9xM/s1600/rediscala_network_IO_5Million_backpressure_cpu_memory.png On Monday, December 22, 2014 3:56:30 AM UTC-5, Akka Team wrote: Hi Soumya First of all, the performance of Akka IO (the original actor based one) might be slow or fast, but it
Re: [akka-user] A little problem with new HTTP 1.0-M1
Thanks for the clarification... the new API is very different. I need to let this whole flows thing sink in a bit, but I kinda see where its going. Granted this is just one use case but I am a little concerned about all the machinery needed to perform something pretty simple, a GET request in this case. Not sure if I'm doing this right but I end up awaiting twice to get the content I need out of the response--like the code below. It does work... Just appears quite a bit more complex than the Spray version. def httpGet( uri:String )(implicit s:ActorSystem) = { // returns (status_code, entity_as_string) implicit val materializer = FlowMaterializer() var r:HttpResponse = null val req = HttpRequest(HttpMethods.GET, Uri(uri)) val host:String = req.uri.authority.host.toString val port:Int = req.uri.effectivePort val httpClient = Http().outgoingConnection(host,port).flow val consumer = Sink.foreach[HttpResponse] { resp ⇒ r = resp } val finishFuture = Source.single(req).via(httpClient).runWith(consumer) Await.result(finishFuture, Duration(3 seconds)) // unpack result (r.status.intValue, Await.result(r.entity.toStrict(FiniteDuration(3,seconds)), Duration(3 seconds) ).data.utf8String) } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] akka-http (experimental) file server
Hi Allan, I updated the gist https://gist.github.com/rklaehn/3f26c3f80e5870831f52 to work with 1.0-M2. However, I am not sure if what I do is idiomatic. It seems to work though. On Mon, Dec 22, 2014 at 9:03 PM, Allan Brighton allane...@gmail.com wrote: Is there an example http file server and client like this that works with the latest 1.0-M1 version? On Thursday, November 20, 2014 4:11:38 PM UTC+1, rklaehn wrote: On Thu, Nov 20, 2014 at 3:31 PM, Allan Brighton alla...@gmail.com wrote: Thanks for the http server example. It works fine. Do you also have a matching example for the client side? I found akka.http.TestClient, which is a start, but does not give any hints about reading the data from the server. Sure. Based on the TestClient example, here is one that reads data from the server and dumps it on the console. https://gist.github.com/rklaehn/3f26c3f80e5870831f52#file-client-example However, last time I tried this did not properly handle chunked encoding. But that was 0.10-M1. I will have to try again. Also, I hope there will be a less verbose way to do a simple GET request. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] DistributedPubSubMediator.SendToAll - any need to register the actor in order to receive messages?
Hello! In order to use the pub-sub mediator with SendToAll strategy - what should I do in order to register my current actor with it? Looks like it doesn't find an existing actor, so it seems I need to register the actor as if I would do that for mediator ! Subscribe(content, self) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: DistributedPubSubMediator.SendToAll - any need to register the actor in order to receive messages?
Oh, I forgot to send a *Put* message to the mediator. That solved the problem. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Exception when persisting the ShardCoordinator
Thanks, Endre A colleague of mine already submitted an issue to them but we haven't received any response yet: https://github.com/dnvriend/akka-persistence-jdbc/issues/9 . Is it possible to have the coordinator's persistence use a different journal than the one used by our own persistent actors? El martes, 23 de diciembre de 2014 14:42:24 UTC-5, Akka Team escribió: Hi Miguel, This seems to be a journal issue. You should contact the maintainer of the JDBC journal. -Endre On Tue, Dec 23, 2014 at 7:43 PM, Miguel Vilá migue...@seven4n.com javascript: wrote: Hi all, I'm having a problem with akka-persistence and akka-sharding. Every now and then, sometimes after running our app for a long time I get this error: DELETE FROM USERSIS.snapshot WHERE persistence_id = '/user/sharding/TokenRouterCoordinator/singleton/coordinator' AND sequence_nr = 2 [ERROR] [12/18/2014 13:51:28.826] [TokenCluster-akka.actor.default- dispatcher-16] [akka://TokenCluster/system/snapshot-store] No more data to read from socket java.sql.SQLRecoverableException: No more data to read from socket at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java: 1157) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:350) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531) at oracle.jdbc.driver.T4CPreparedStatement.doOall8( T4CPreparedStatement.java:208) at oracle.jdbc.driver.T4CPreparedStatement.executeForRows( T4CPreparedStatement.java:1046) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout( OracleStatement.java:1336) at oracle.jdbc.driver.OraclePreparedStatement.executeInternal( OraclePreparedStatement.java:3613) at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate( OraclePreparedStatement.java:3694) at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate( OraclePreparedStatementWrapper.java:1354) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate( DelegatingPreparedStatement.java:105) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate( DelegatingPreparedStatement.java:105) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1. apply$mcI$sp(StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$NakedExecutor.apply( StatementExecutor.scala:33) at scalikejdbc.StatementExecutor$$anon$1. scalikejdbc$StatementExecutor$LoggingSQLAndTiming$$super$apply( StatementExecutor.scala:317) at scalikejdbc.StatementExecutor$LoggingSQLAndTiming$class.apply( StatementExecutor.scala:264) at scalikejdbc.StatementExecutor$$anon$1. scalikejdbc$StatementExecutor$LoggingSQLIfFailed$$super$apply( StatementExecutor.scala:317) at scalikejdbc.StatementExecutor$LoggingSQLIfFailed$class.apply( StatementExecutor.scala:295) at scalikejdbc.StatementExecutor$$anon$1.apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor.executeUpdate(StatementExecutor. scala:337) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply(DBSession .scala:352) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply(DBSession .scala:350) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.ActiveSession.using(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:349) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:327) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.DBConnection$class.autoCommit(DBConnection.scala:183) at scalikejdbc.DB.autoCommit(DB.scala:75) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:218) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:217) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.DB$.using(DB.scala:150) at scalikejdbc.DB$.autoCommit(DB.scala:217) at scalikejdbc.SQLUpdate.apply(SQL.scala:486) at akka.persistence.jdbc.snapshot.GenericStatements$class. deleteSnapshot(Statements.scala:30) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore. deleteSnapshot(SnapshotStores.scala:16) at akka.persistence.jdbc.snapshot.JdbcSyncSnapshotStore$class.delete( JdbcSyncSnapshotStore.scala:34) at akka.persistence.jdbc.snapshot.OracleSyncSnapshotStore.delete( SnapshotStores.scala:16) at akka.persistence.snapshot.SnapshotStore$$anonfun$receive$1. applyOrElse(SnapshotStore.scala:44) at
Re: [akka-user] Exception when persisting the ShardCoordinator
Just an off the cuff guess: akka-persistence-jdbc does not provide a ConnectionPoolSettings when creating the connection pool, which means that the defaults will be provided, which does not contain a validation query, which means that the only way to test the validity of the connections to Oracle DB is via the isValid getter, which does not reset the idle timer of the connection, which leads to connections timing out. So if my guess is correct the fix is to add the proper validation query when creating the connection pool, and if memory serves me right the common query for this when it comes to Oracle DB is select 1 from dual. On Tue, Dec 23, 2014 at 11:24 PM, Miguel Vilá miguelvi...@gmail.com wrote: Thanks, Endre A colleague of mine already submitted an issue to them but we haven't received any response yet: https://github.com/dnvriend/akka-persistence-jdbc/issues/9 . Is it possible to have the coordinator's persistence use a different journal than the one used by our own persistent actors? El martes, 23 de diciembre de 2014 14:42:24 UTC-5, Akka Team escribió: Hi Miguel, This seems to be a journal issue. You should contact the maintainer of the JDBC journal. -Endre On Tue, Dec 23, 2014 at 7:43 PM, Miguel Vilá migue...@seven4n.com wrote: Hi all, I'm having a problem with akka-persistence and akka-sharding. Every now and then, sometimes after running our app for a long time I get this error: DELETE FROM USERSIS.snapshot WHERE persistence_id = '/user/sharding/ TokenRouterCoordinator/singleton/coordinator' AND sequence_nr = 2 [ERROR] [12/18/2014 13:51:28.826] [TokenCluster-akka.actor.default- dispatcher-16] [akka://TokenCluster/system/snapshot-store] No more data to read from socket java.sql.SQLRecoverableException: No more data to read from socket at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java: 1157) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:350) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:227) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:531) at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedS tatement.java:208) at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPr eparedStatement.java:1046) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(Orac leStatement.java:1336) at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(O raclePreparedStatement.java:3613) at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(Ora clePreparedStatement.java:3694) at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate( OraclePreparedStatementWrapper.java:1354) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate (DelegatingPreparedStatement.java:105) at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate (DelegatingPreparedStatement.java:105) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply $mcI$sp(StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$$anonfun$executeUpdate$1.apply( StatementExecutor.scala:337) at scalikejdbc.StatementExecutor$NakedExecutor.apply(StatementE xecutor.scala:33) at scalikejdbc.StatementExecutor$$anon$1.scalikejdbc$ StatementExecutor$LoggingSQLAndTiming$$super$apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor$LoggingSQLAndTiming$class.apply( StatementExecutor.scala:264) at scalikejdbc.StatementExecutor$$anon$1.scalikejdbc$ StatementExecutor$LoggingSQLIfFailed$$super$apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor$LoggingSQLIfFailed$class.apply( StatementExecutor.scala:295) at scalikejdbc.StatementExecutor$$anon$1.apply(StatementExecutor. scala:317) at scalikejdbc.StatementExecutor.executeUpdate(StatementExecutor. scala:337) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply( DBSession.scala:352) at scalikejdbc.DBSession$$anonfun$updateWithFilters$1.apply( DBSession.scala:350) at scalikejdbc.LoanPattern$class.using(LoanPattern.scala:33) at scalikejdbc.ActiveSession.using(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:349 ) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.DBSession$class.updateWithFilters(DBSession.scala:327 ) at scalikejdbc.ActiveSession.updateWithFilters(DBSession.scala:457) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.SQLUpdate$$anonfun$10.apply(SQL.scala:486) at scalikejdbc.DBConnection$class.autoCommit(DBConnection.scala:183) at scalikejdbc.DB.autoCommit(DB.scala:75) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:218) at scalikejdbc.DB$$anonfun$autoCommit$1.apply(DB.scala:217) at
Re: [akka-user] Trying to understand a sudden drop in throughput with Akka IO
def insertValues(rnd: String): Flow[Int, Boolean] = { Flow[Int].mapAsyncUnordered(k = redis.set(k + rnd, message)) } val maxSeq = 500 val seqSource = Source( () = (1 to maxSeq).iterator ) *val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)* My understanding is that the next request is send to Redis from the client only after a single Future is completed. Is this correct ? No, the number of allowed uncompleted Futures is defined by the buffer size. If there wouldn't be a parallelization between Futures then there would be no need for an ordered and unordered version of the same operation. Understood. So a map version will always be slower. Is there a way I can batch a bunch of set requests and wait for them to be over before I can send a new batch ? If there would be a version of set that accepts a Seq[] of writes, let's say batchSet then you could use: seqSource.grouped(100).mapAsyncUnordered(ks = redis.batchSet(...)) Where grouped makes maximum 100 sized groups from the stream of elements resulting in a stream of sequences. You need API support for that from the Redis client though. Yeah I tried that. Here is the code and the network IO. Throughput is better, of course at the cost of latency. I've not figured out a way to measure latency. Once I've a reliable way of doing so I can figure out what the difference in latency is. seqSource.grouped(100).mapAsyncUnordered { grp = { val tran = redis.transaction() for (i - grp) yield { tran.set(i + random2, message) } tran.exec() } }.runWith(blackhole) https://lh6.googleusercontent.com/-uheE7cqhSgQ/VJn6arTCdxI/vaY/hNZgyozl1JE/s1600/5million_1k_messages_grouped100.png -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka HTTP outgoing connections always initialized in GraphFlows
Hi I am building a simple reverse proxy as an exercise to understand Akka Streams. I have read the new docs but could not find an answer. I'm using FlexiRoute as recommended in an earlier thread to determine when my Akka HTTP service should proxy to the remote service and when not to. But even if the flow does not go through the reverse proxy part of the graph, I still see a remote connection established in the logs. This happens every single time, regardless of input request. It looks as if the connection gets initialized when materializing the flow, after that the connection times out when that part of the flow is never reached. How do I avoid this behaviour? I only want the connection to be established when that part of the graph flow is executed. This is the flow graph setup: val partialBranch = PartialFlowGraph { implicit b ⇒ import FlowGraphImplicits._ val route = new BranchRoute[HttpRequest] val merge = MergePreferred[HttpResponse] in ~ proxyBranch ~ route.in route.success ~ proxyTo ~ con ~ merge route.failure ~ badRequestFlow ~ merge.preferred merge ~ out } Please see this gist for a fully working example: https://gist.github.com/magnusart/bffe1a15dab30f794235 Oh, and merry christmas! :) /Magnus -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.