[akka-user] Akka Stream is not running (without compiler error)
Hi, I'm using Akka Stream M5. I have always been a fan of delayed/lazy execution, kind of like Akka Stream's source->flow->sink and then run(). So I created an array to store functions called: val actionStream: ArrayBuffer[(IntermediateResult) => IntermediateResult] = ArrayBuffer.empty[(IntermediateResult) => IntermediateResult] Then I want to use Akka Stream's mapAsync to execute those stored functions on data, so I wrote this: protected val source: Source[NormalRow, Unit] = Source(() => data. dataIterator) val printSink = Sink.foreach[IntermediateResult](e => save(combine(e._1, e. _2))) val sourceReady = if (actionStream.size == 1) source .via(Flow[NormalRow] .mapAsync(e => {println("inside here"); applyHeadFlow(e, actionStream.head )})) else actionStream.drop(1).foldLeft(source .via(Flow[NormalRow] .mapAsync(e =>applyHeadFlow(e, actionStream.head))) ){(source, action) => source.via(Flow[IntermediateResult].mapAsync(e => Future(action(e } sourceReady.runWith(printSink) And as you might notice, there is a discrepancy between the first action and the subsequent actions (the first works on NormalRow, but the rest works on IntermediateResult), so I created a helper function: def applyHeadFlow(row: NormalRow, action: (IntermediateResult) => IntermediateResult): Future[IntermediateResult] = Future { action.apply(row, None) } I thought this is already and should be fine, but when I tried to run it, there is no output generated. So I start putting "println()" statements inside the mapAsync() or Sink function, but console did not print anything as well. Can anyone care to offer somehelp?? Sincerely, Allen -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Stopping Akka Actor if the Event Stream is empty
1,send some protocol level message to indicate that all file have been handed ,like NoMoreWork,then the actor case at it,and then stop self via context.stop(self) 2,send PoisonPill 3,tick a time out to self I think the first one is OK 在 2015年4月2日星期四 UTC+8上午2:38:05,Syd Gillani写道: > > Hi, > > I have publisher subscriber actor system, where two actors are subscribed > to an Eventstream. Now I am reading the data from a file and publishing it > on an event stream, so how should I stop the actors when all the file is > published. I can't terminate the actor system, as there might be some > unread message in each actor's queue. Cheers > > > Syd. > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Detecting when all children actors are finished
I am trying to find a way to find out when all children actors are finished working (processed all the messages in their respective queues). Think about graph traversal when each vertex discovery requires its own actor. The depth of the tree is finite and known and each level is requires its own actor, but the breadth is not. The only way I can think of to figure out that there are no more vertices to discover is to implement an event bus. Each actor will send one event (+) when it starts the processing and another (-) whenprocessing is complete. The messages are received by some sort of supervising actor and once the total sum of all messages equals zero traversal is complete. This approach seems to be kludgy and fragile ( I have to introduce some sort of correlation id since multiple graph traversals can be running in parallel). I am wondering if there a simpler solution (monitoring all children's message queues)? -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Stopping Akka Actor if the Event Stream is empty
Hi, I have publisher subscriber actor system, where two actors are subscribed to an Eventstream. Now I am reading the data from a file and publishing it on an event stream, so how should I stop the actors when all the file is published. I can't terminate the actor system, as there might be some unread message in each actor's queue. Cheers Syd. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: how to tune akka remoting performance
Are you sending all messages in one go? You can compare with this benchmark: https://github.com/akka/akka/tree/release-2.3/akka-samples/akka-sample-remote-scala/src/main/scala/sample/remote/benchmark /Patrik > 1 apr 2015 kl. 15:54 skrev Viktor Klang : > > Looks like you're using Java Serialization, a good first start would be to > switch serializers for your messages. > >> On Wed, Apr 1, 2015 at 3:49 PM, Ivan Balashov wrote: >> Viktor, >> >> Here is the latest configuration that allowed to me reach 30msg/sec between >> 2 x 4cpu hosts in GCE network. >> >> http://goo.gl/sPnjQS >> >> On both sides context switches jumped to 30-50K/sec >> >> Client: >> 0 0 0 1528652 94396 79422800 0 4 468 772 0 0 100 >> 0 >> 2 0 0 1533988 94408 79422400 0 5 9051 16555 11 4 85 >> 0 >> 1 0 0 1326560 94412 79422400 0 6 28710 52792 34 13 >> 54 0 >> 2 0 0 1326188 94412 79422800 0 3 30209 59439 26 8 >> 66 0 >> 1 0 0 1325444 94420 79422800 0 4 26194 51556 24 6 >> 70 0 >> 1 0 0 1328172 94424 79422800 0 1 468 725 0 0 100 >> 0 >> >> Server: >> 3 0 0 827812 30372 206631600 0 3 9762 13188 48 7 45 >> 0 >> 1 0 0 564080 30388 206631600 0 3 19969 23901 29 13 >> 59 0 >> 4 0 0 416900 30388 206631600 0 0 24864 31988 39 13 >> 48 0 >> 1 0 0 241876 30388 206631600 0 3 24113 34808 42 11 >> 47 0 >> 4 0 0 145280 30404 201443200 0 2 21127 40357 44 6 >> 50 0 >> 2 0 0 106608 30404 196002400 0 0 19162 36545 47 6 >> 47 0 >> 4 0 0 138560 30328 186709200 0 6 14574 27803 57 4 >> 39 0 >> 2 0 0 293616 30336 186709200 0 4 6390 10023 42 12 46 >> 0 >> >> Couple more things that concerned me: >> >> 1) [WARN] [04/01/2015 12:54:26.125] [systemB-network-dispatcher-5] >> [akka.tcp://systemB@10.110.112.155:2001/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsystemA%4010.110.47.70%3A2001-0/endpointWriter] >> [ >> 473494] buffered messages in EndpointWriter for >> [akka.tcp://systemA@10.110.47.70:2001]. You should probably implement flow >> control to avoid flooding the remote connection. >> >> Looks like either side is sending more messages than receiving is capable to >> accept. This however, might be cause by 1M network buffer, which, OTOH is >> needed for higher throughput. >> jFTR, GCE network is quite fast (my last measurement gives me 72Mb/sec, >> while during the test we barely hit 10Mb/sec). >> >> 2) Significant heap pressure, apparently caused by 1) and, as a consequence, >> some GC activity. Profiler gives me estimate of ~700bytes per every message >> in the queue, quickly growing heap. The slower messages get processed, the >> more of them are accumulated, the more cpu is needed for GC, chicken-egg. >> >> Any configuration advice to achieve better throughput in this scenario of >> many small messages? >> >> >>> On Wednesday, April 1, 2015 at 10:59:32 AM UTC+3, √ wrote: >>> Could you share your entire config? >>> On Tue, Mar 31, 2015 at 5:31 PM, Ivan Balashov wrote: > On Tuesday, March 31, 2015 at 11:23:25 AM UTC+3, √ wrote: > Sounds like you're using too many FJ threads. I wish it was that simple. For both remoting and actor pool the same dispatcher is used (4 core box): > type = Dispatcher > executor = "fork-join-executor" > throughput = 1000 // Does this apply to FJ, or only to > ThreadPoolEx? > fork-join-executor { > parallelism-min = 1 > parallelism-max = 4 > } I get lower CS values if I set parallelism-max=1, maintaining about the same total throughput only with less cpu burn. However, it looks like CS should not depend much on whether we have 1 or 4 or 10 FJ threads, most switching must be happing on deeper level, e.g. controlled by `throughput`. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. >>> >>> >>> >>> -- >>> Cheers, >>> √ >> >> -- >> >> Read the docs: http://akka.io/docs/ >> >> Check the FAQ
Re: [akka-user] Re: how to tune akka remoting performance
Looks like you're using Java Serialization, a good first start would be to switch serializers for your messages. On Wed, Apr 1, 2015 at 3:49 PM, Ivan Balashov wrote: > Viktor, > > Here is the latest configuration that allowed to me reach 30msg/sec > between 2 x 4cpu hosts in GCE network. > > http://goo.gl/sPnjQS > > On both sides context switches jumped to 30-50K/sec > > Client: > 0 0 0 1528652 94396 79422800 0 4 468 772 0 0 > 100 0 > 2 0 0 1533988 94408 79422400 0 5 9051 16555 11 4 > 85 0 > 1 0 0 1326560 94412 79422400 0 6 28710 52792 34 13 > 54 0 > 2 0 0 1326188 94412 79422800 0 3 30209 59439 26 8 > 66 0 > 1 0 0 1325444 94420 79422800 0 4 26194 51556 24 6 > 70 0 > 1 0 0 1328172 94424 79422800 0 1 468 725 0 0 > 100 0 > > Server: > 3 0 0 827812 30372 206631600 0 3 9762 13188 48 7 > 45 0 > 1 0 0 564080 30388 206631600 0 3 19969 23901 29 13 > 59 0 > 4 0 0 416900 30388 206631600 0 0 24864 31988 39 13 > 48 0 > 1 0 0 241876 30388 206631600 0 3 24113 34808 42 11 > 47 0 > 4 0 0 145280 30404 201443200 0 2 21127 40357 44 6 > 50 0 > 2 0 0 106608 30404 196002400 0 0 19162 36545 47 6 > 47 0 > 4 0 0 138560 30328 186709200 0 6 14574 27803 57 4 > 39 0 > 2 0 0 293616 30336 186709200 0 4 6390 10023 42 12 > 46 0 > > Couple more things that concerned me: > > 1) [WARN] [04/01/2015 12:54:26.125] [systemB-network-dispatcher-5] > [akka.tcp:// > systemB@10.110.112.155:2001/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsystemA%4010.110.47.70%3A2001-0/endpointWriter] > [ > *473494*] buffered messages in EndpointWriter for [akka.tcp:// > systemA@10.110.47.70:2001]. *You should probably implement flow control > to avoid flooding the remote connection.* > > Looks like either side is sending more messages than receiving is capable > to accept. This however, might be cause by 1M network buffer, which, OTOH > is needed for higher throughput. > jFTR, GCE network is quite fast (my last measurement gives me 72Mb/sec, > while during the test we barely hit 10Mb/sec). > > 2) Significant heap pressure, apparently caused by 1) and, as a > consequence, some GC activity. Profiler gives me estimate of ~700bytes per > every message in the queue, quickly growing heap. The slower messages get > processed, the more of them are accumulated, the more cpu is needed for GC, > chicken-egg. > > Any configuration advice to achieve better throughput in this scenario of > many small messages? > > > On Wednesday, April 1, 2015 at 10:59:32 AM UTC+3, √ wrote: >> >> Could you share your entire config? >> >> On Tue, Mar 31, 2015 at 5:31 PM, Ivan Balashov wrote: >> >>> >>> >>> On Tuesday, March 31, 2015 at 11:23:25 AM UTC+3, √ wrote: Sounds like you're using too many FJ threads. >>> >>> I wish it was that simple. For both remoting and actor pool the same >>> dispatcher is used (4 core box): >>> >>> type = Dispatcher executor = "fork-join-executor" throughput = 1000 // Does this apply to FJ, or only to ThreadPoolEx? fork-join-executor { parallelism-min = 1 parallelism-max = 4 } >>> >>> >>> I get lower CS values if I set parallelism-max=1, maintaining about the >>> same total throughput only with less cpu burn. >>> However, it looks like CS should not depend much on whether we have 1 or >>> 4 or 10 FJ threads, most switching must be happing on deeper level, e.g. >>> controlled by `throughput`. >>> >>> >>> >>> -- >>> >> Read the docs: http://akka.io/docs/ >>> >> Check the FAQ: http://doc.akka.io/docs/akka/ >>> current/additional/faq.html >>> >> Search the archives: https://groups.google.com/ >>> group/akka-user >>> --- >>> You received this message because you are subscribed to the Google >>> Groups "Akka User List" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to akka-user+...@googlegroups.com. >>> To post to this group, send email to akka...@googlegroups.com. >>> Visit this group at http://groups.google.com/group/akka-user. >>> For more options, visit https://groups.google.com/d/optout. >>> >> >> >> >> -- >> Cheers, >> √ >> > -- > >> Read the docs: http://akka.io/docs/ > >> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@
Re: [akka-user] Re: how to tune akka remoting performance
Viktor, Here is the latest configuration that allowed to me reach 30msg/sec between 2 x 4cpu hosts in GCE network. http://goo.gl/sPnjQS On both sides context switches jumped to 30-50K/sec Client: 0 0 0 1528652 94396 79422800 0 4 468 772 0 0 100 0 2 0 0 1533988 94408 79422400 0 5 9051 16555 11 4 85 0 1 0 0 1326560 94412 79422400 0 6 28710 52792 34 13 54 0 2 0 0 1326188 94412 79422800 0 3 30209 59439 26 8 66 0 1 0 0 1325444 94420 79422800 0 4 26194 51556 24 6 70 0 1 0 0 1328172 94424 79422800 0 1 468 725 0 0 100 0 Server: 3 0 0 827812 30372 206631600 0 3 9762 13188 48 7 45 0 1 0 0 564080 30388 206631600 0 3 19969 23901 29 13 59 0 4 0 0 416900 30388 206631600 0 0 24864 31988 39 13 48 0 1 0 0 241876 30388 206631600 0 3 24113 34808 42 11 47 0 4 0 0 145280 30404 201443200 0 2 21127 40357 44 6 50 0 2 0 0 106608 30404 196002400 0 0 19162 36545 47 6 47 0 4 0 0 138560 30328 186709200 0 6 14574 27803 57 4 39 0 2 0 0 293616 30336 186709200 0 4 6390 10023 42 12 46 0 Couple more things that concerned me: 1) [WARN] [04/01/2015 12:54:26.125] [systemB-network-dispatcher-5] [akka.tcp://systemB@10.110.112.155:2001/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsystemA%4010.110.47.70%3A2001-0/endpointWriter] [ *473494*] buffered messages in EndpointWriter for [akka.tcp://systemA@10.110.47.70:2001]. *You should probably implement flow control to avoid flooding the remote connection.* Looks like either side is sending more messages than receiving is capable to accept. This however, might be cause by 1M network buffer, which, OTOH is needed for higher throughput. jFTR, GCE network is quite fast (my last measurement gives me 72Mb/sec, while during the test we barely hit 10Mb/sec). 2) Significant heap pressure, apparently caused by 1) and, as a consequence, some GC activity. Profiler gives me estimate of ~700bytes per every message in the queue, quickly growing heap. The slower messages get processed, the more of them are accumulated, the more cpu is needed for GC, chicken-egg. Any configuration advice to achieve better throughput in this scenario of many small messages? On Wednesday, April 1, 2015 at 10:59:32 AM UTC+3, √ wrote: > > Could you share your entire config? > > On Tue, Mar 31, 2015 at 5:31 PM, Ivan Balashov > wrote: > >> >> >> On Tuesday, March 31, 2015 at 11:23:25 AM UTC+3, √ wrote: >>> >>> Sounds like you're using too many FJ threads. >>> >> >> I wish it was that simple. For both remoting and actor pool the same >> dispatcher is used (4 core box): >> >> type = Dispatcher >>> executor = "fork-join-executor" >>> throughput = 1000 // Does this apply to FJ, or only to >>> ThreadPoolEx? >>> fork-join-executor { >>> parallelism-min = 1 >>> parallelism-max = 4 >>> } >> >> >> I get lower CS values if I set parallelism-max=1, maintaining about the >> same total throughput only with less cpu burn. >> However, it looks like CS should not depend much on whether we have 1 or >> 4 or 10 FJ threads, most switching must be happing on deeper level, e.g. >> controlled by `throughput`. >> >> >> >> >>> -- >> >> Read the docs: http://akka.io/docs/ >> >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to akka-user+...@googlegroups.com . >> To post to this group, send email to akka...@googlegroups.com >> . >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > Cheers, > √ > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] About wait-queue-capacity in akka-persistence-sql-async journal plugin
Does anyone use akka-persistence-sql-async plugin for the akka journal? Is there a limit on its wait-queue-capacity setting? I stored the journal msg in postgres. When I replayed the actor msg, I got "com.github.mauricio.async.db.pool.PoolExhaustedException: There are no objects available and the waitQueue is full". This plugin underlying uses com.github.mauricio:postgresql-async. It seems akka-persistence-sql-async's wait-queue-capacity enable to me to solve the problem after increasing its size from default 10,000 to 100,000. Is there a limit? Snapshot may be an alternative option. Thanks for any advice. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Http client and chunked encoding
Hi all, I am trying to consume a chunked http stream from the client side. The code is basically identical to the gist https://gist.github.com/rklaehn/3f26c3f80e5870831f52#file-client-example ```scala val printChunksConsumer = Sink.foreach[HttpResponse] { res => if(res.status == StatusCodes.OK) { println("Got 200!") if(res.entity.isChunked) println("Chunky!") res.entity.dataBytes.map { chunk => System.out.write(chunk.toArray) System.out.flush() }.to(Sink.ignore).run() } else println(res.status) } ``` However, it seems that this still does not work. When I do a request that produces a very long chunked response, the map never gets executed. I tried various ways of accessing the chunks: matching on the entity and mapping the chunks, dataBytes, getDataBytes. Nothing seems to make a difference. The server side is definitely working. At least it works like a charm when using curl. This is using akka-http 1.0-M5. Any ideas? Rüdiger -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: how to tune akka remoting performance
Could you share your entire config? On Tue, Mar 31, 2015 at 5:31 PM, Ivan Balashov wrote: > > > On Tuesday, March 31, 2015 at 11:23:25 AM UTC+3, √ wrote: >> >> Sounds like you're using too many FJ threads. >> > > I wish it was that simple. For both remoting and actor pool the same > dispatcher is used (4 core box): > > type = Dispatcher >> executor = "fork-join-executor" >> throughput = 1000 // Does this apply to FJ, or only to >> ThreadPoolEx? >> fork-join-executor { >> parallelism-min = 1 >> parallelism-max = 4 >> } > > > I get lower CS values if I set parallelism-max=1, maintaining about the > same total throughput only with less cpu burn. > However, it looks like CS should not depend much on whether we have 1 or 4 > or 10 FJ threads, most switching must be happing on deeper level, e.g. > controlled by `throughput`. > > > > >> -- > >> Read the docs: http://akka.io/docs/ > >> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Cheers, √ -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.