Re: [akka-user] [cluster] minimum number of members and cluster-singleton
Something like downing all members if there's no quorum? I was thinking about that, but there are two problems: - under the current cluster specification, is there a way to resurrect a node? Wouldn't smarter downing require you to add a new state, something like suspended (waiting for enough members to be in the same state)? - wouldn't this be too global? a per-singleton setting doesn't impact the cluster as a whole, you can have in a single cluster some services which require a quorum, and others which can operate even in a split brain scenario If I get some free time I'd be happy to work on the issue, I assume master is a good starting point? Adam On Friday, January 2, 2015 3:58:37 PM UTC+1, Patrik Nordwall wrote: Hi Adam, We have been thinking that this should be handled with a smarter downing strategy, but since it would be easy to implement it in the singleton manager as you propose I think we should consider it. It has already been requested in issue: https://github.com/akka/akka/issues/16535 Cheers, Patrik On Tue, Dec 30, 2014 at 1:07 PM, Adam Warski ad...@warski.org javascript: wrote: Hello, I was wondering how to avoid split-brain issues when using a cluster singleton and automatic downing. Wouldn't it make sense to add a min-nr-of-members setting to the singleton? That is, the singleton wouldn't start unless there's a given number of members in the cluster online. That would solve the split brain problem with singletons. I know there is the per-role min-nr-of-members setting, which impacts the startup of the cluster as a whole, and maybe this could be re-used (currently if nodes start up, and then some die (leaving a smaller number than the setting alive), the singleton starts up anyway). Adam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] [cluster] minimum number of members and cluster-singleton
On Tue, Jan 6, 2015 at 11:01 AM, Adam Warski a...@warski.org wrote: Something like downing all members if there's no quorum? Yes, something like that. From the larger healthy side of the partition mark all unreachable as down, and from the other side shutdown yourself. That will not work well for partitions that split the cluster in 2 groups. I was thinking about that, but there are two problems: - under the current cluster specification, is there a way to resurrect a node? Wouldn't smarter downing require you to add a new state, something like suspended (waiting for enough members to be in the same state)? Unreachable is not a member state, and it can become reachable again. However, some kind of stabilization must be awaited before the decision can be made. - wouldn't this be too global? a per-singleton setting doesn't impact the cluster as a whole, you can have in a single cluster some services which require a quorum, and others which can operate even in a split brain scenario yes, that is a valid point If I get some free time I'd be happy to work on the issue, I assume master is a good starting point? great, master it is /Patrik Adam On Friday, January 2, 2015 3:58:37 PM UTC+1, Patrik Nordwall wrote: Hi Adam, We have been thinking that this should be handled with a smarter downing strategy, but since it would be easy to implement it in the singleton manager as you propose I think we should consider it. It has already been requested in issue: https://github.com/ akka/akka/issues/16535 Cheers, Patrik On Tue, Dec 30, 2014 at 1:07 PM, Adam Warski ad...@warski.org wrote: Hello, I was wondering how to avoid split-brain issues when using a cluster singleton and automatic downing. Wouldn't it make sense to add a min-nr-of-members setting to the singleton? That is, the singleton wouldn't start unless there's a given number of members in the cluster online. That would solve the split brain problem with singletons. I know there is the per-role min-nr-of-members setting, which impacts the startup of the cluster as a whole, and maybe this could be re-used (currently if nodes start up, and then some die (leaving a smaller number than the setting alive), the singleton starts up anyway). Adam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: [akka-persistence] Testing systems which have Persistence
Ah, I did not understand that. I have solved that by starting a new Akka system and a new JavaTestKit in each test, and doing the test code in a static initializer. Before, I switch to the in-mem journal using system properties. In my abstract super-class: *private *ActorSystem *system*; @Before *public void *startAkka() { *system *= ActorSystem.*create*(*TestSys*, ConfigFactory.*load*() .withValue(*akka.persistence.journal.plugin*, *fromAnyRef*( *akka.persistence.journal.inmem*))); } @After *public void *stoppAkka() { *system*.shutdown(); *system*.awaitTermination(); } *protected *ActorSystem system() { *return **system*; } And in my tests: @Test *public void *test() { *new *JavaTestKit(system()) {{ // test code }}; } JavaTestKit is (afaik) just a wrapper around the Akka system, so if you have common code used in more than one test, just send in the system as a parameter, and create a new JavaTestKit. If you use Scala, this can probably (?) be done in a different way, I would not know... Kind regards, Anders kl. 04:04:05 UTC+1 tirsdag 6. januar 2015 skrev Ian Holsman følgende: Thanks for answering Anders, your answers were very helpful. for #1 (starting in a known state). I was more asking how do I start with a 'fresh/clean' journal with no history. I can manually remove the files, but I was hoping there was something in the test-kit I could do which will achieve this so I can do the tests in my IDE. On Mon, Jan 5, 2015 at 6:55 AM, Anders Båtstrand ande...@gmail.com javascript: wrote: I have only been using Akka Persistence for a few months, but this is my suggestions: 1. I always start empty, but that is possibly not what you want. Maybe a file based storage, and you copy the files in place right before you start the Akka system? 2. I have solved the same problem by letting the parent actor be responsible for killing the children. That is: The child receives an event that it calculates should result in it's own destruction, and it then calls the parent, which then kills it (and updates a map over actors). In my case the parent is also delegating messages to the child, so this way I am sure not to send messages to any actor that is on the way down (that is, before the parent received the Terminated-message). 3. I am not sure i understood the question. I have CreateChild and KillChild events persistent from the parent actor. During recovery, it makes a map over actors it should create or not, and only creates then onRecoveryComplete. That way I do not create actors I would immediatly kill. Hope this was helpful! Best regards, Anders Båtstrand kl. 19:20:30 UTC+1 fredag 2. januar 2015 skrev Ian Holsman følgende: Hi. I'm building a 'toy' game that makes uses of akka-persistence, in order to learn it. in a nutshell the game is 1000's of people/bots create villages, they act independently of each other, and generally pass messages to each other to chat/fight and all that good stuff. Where I am having problems are: 1. how do I initialize the persistence system at the start of testing so I am in a 'known state' when I start. 2. how I assign IDs (persistanceId) to actors. Ideally I would use a map reference for while the villages exist, and be able to re-use them if a new village is created. but the problem is it finds the old village, and replays all the events (including the death event which kills it). I don't mind re-using a old actor, but i'd like to stop it (ie poisonpill) when it isn't active. It seems like they need to be unique for the life of the system, and to do so I would need a counter in place or GUIDs. but then I would need to keep track of the Map reference/GUID mapping meaning my code would need to first talk to the mapping holder, and then to the village itself (2 messages, 1 sync) which seems like non-ideal. (as opposed to context.actorselection(XYZ map-reference) ! message which I guess is still 2 messages under the covers, 1 sync too) 3. the recovery log only seems to 'start' when I bring up the actor. Is there a way to know which actors are currently 'active' and just let Akka start them all up, instead of me keeping a list/map of 'active' villages, and manually restarting them. Thanks .. Ian. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email
[akka-user] Naming mismatch? FlowGraphBuilder = Builder of FlowGraph vs GraphFlow
Hi, Notice the naming mismatch - FlowGraphBuilder builds FlowGraph's when there's also the case class GraphFlow. Are the types (supposed to be) working together? What's the reason behind the naming difference? Just ran across it this morning while reviewing the source code and worth to point out that I have no idea what the two types are for. Jacek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Cluster Project - Monolithic JAR or JARs per service?
Hi Kane, Exploring the different options, one limitation that I can see is that ClusterPoolRouter requires the class of the actor that's going to be remotely deployed to the cluster to be *present on the class path of the router. *That is, if our front-ends are to create a worker on a remote machine to handle a request, the class for that router must be in the JAR on the front-end machine. *Please correct me if I'm mistaken here.* In my experience this is true. In my activator template http://typesafe.com/activator/template/play-akka-cluster-sample I used the same pattern you describe, building an *API* package that contains all the messages in order to distribute the classes over all services. However this means that your spray-frontend has dependencies to *all *API packages. This could get messy if you have a lot of small services (versioning, backwards compatibility, etc.) I'm really interested in other user suggestions :) cheers, Muki -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] BalancingPool with custom mailbox
On Mon, Jan 5, 2015 at 9:18 PM, Pradeep Gollakota pradeep...@gmail.com wrote: Hi Patrik, Thanks for the response. Is there a work around for this I can employ? Is it possible to use a custom mailbox with the old balancing dispatcher (from pre 2.3)? Sorry, not that I know from the top of my head. Then you have to study the referenced issues and pull requests and try the things mentioned there. I would probably look for another solution. /Patrik On Friday, January 2, 2015 7:05:02 AM UTC-8, Patrik Nordwall wrote: Hi Pradeep, Custom mailbox with balancing pool is not possible with Akka 2.3.x. It might be supported later, see https://github.com/akka/ akka/issues/13961 and https://github.com/akka/akka/issues/13964 Regards, Patrik On Tue, Dec 30, 2014 at 8:58 PM, Pradeep Gollakota prade...@gmail.com wrote: Hi All, I’m trying to create an ActorSystem where a set of actors have a shared mailbox that’s prioritized. I’ve tested my mailbox without using the BalancingPool router, and the messages are correctly prioritized. However, when I try to create the actors using BalancingPool, the messages are no longer prioritized. How do I create a BalancingPool router with a custom mailbox? With the following code, the messages are not prioritized: val system = ActorSystem(MySystem) val actor = system.actorOf(BalancingPool(1).props(Props[MyActor]).withMailbox(my-mailbox), myactor) With the following code, the messages are prioritized correctly. val system = ActorSystem(MySystem) val actor = system.actorOf(Props[MyActor].withMailbox(my-mailbox), myactor) Thanks in advance, Pradeep -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Naming mismatch? FlowGraphBuilder = Builder of FlowGraph vs GraphFlow
Hi Jacek, The public API consist of FlowGraph and FlowGraphBuilder. GraphFlow is an internal implementation detail. Regards, Patrik PS. In my opinion we should rename GraphFlow to something else. It's rather confusing for anyone reading the source code. On Tue, Jan 6, 2015 at 12:04 PM, Jacek Laskowski jacek.japila...@gmail.com wrote: Hi, Notice the naming mismatch - FlowGraphBuilder builds FlowGraph's when there's also the case class GraphFlow. Are the types (supposed to be) working together? What's the reason behind the naming difference? Just ran across it this morning while reviewing the source code and worth to point out that I have no idea what the two types are for. Jacek -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Application.conf not being used under test
I would guess it is a classpath ordering issue. To be sure you load the right config from tests you can use another file name for the test config name. In the tests you load that config and pass to the factory method of the ActorSystem. /Patrik On Mon, Jan 5, 2015 at 1:45 PM, Anders Båtstrand ander...@gmail.com wrote: How do you load the configuration? And could you post the output of Config.origin() after you have loaded the configuration? I had a problem with configuration being overidden, but solved it by reading the javadoc on ConfigFactor.parseResourcesAnySyntax (cleared up a few things for me). Best regards, Anders Båtstrand kl. 04:32:26 UTC+1 mandag 5. januar 2015 skrev manwood følgende: I have a small Akka application with application.conf files under both test and main directories. Initially, when running the tests, the application.conf file under the test directory was being used (as desired). However, whilst building out the application, adding dependencies etc, at some point the test code has started to use the application.conf under main instead. What could have caused this? How can I establish why? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: [akka-persistence] Testing systems which have Persistence
That's exactly what I needed. Thanks Anders. (I put it in my akka.conf) On Tue, Jan 6, 2015 at 2:41 AM, Anders Båtstrand ander...@gmail.com wrote: Ah, I did not understand that. I have solved that by starting a new Akka system and a new JavaTestKit in each test, and doing the test code in a static initializer. Before, I switch to the in-mem journal using system properties. In my abstract super-class: private ActorSystem system; @Before public void startAkka() { system = ActorSystem.create(TestSys, ConfigFactory.load() .withValue(akka.persistence.journal.plugin, fromAnyRef(akka.persistence.journal.inmem))); } @After public void stoppAkka() { system.shutdown(); system.awaitTermination(); } protected ActorSystem system() { return system; } And in my tests: @Test public void test() { new JavaTestKit(system()) {{ // test code }}; } JavaTestKit is (afaik) just a wrapper around the Akka system, so if you have common code used in more than one test, just send in the system as a parameter, and create a new JavaTestKit. If you use Scala, this can probably (?) be done in a different way, I would not know... Kind regards, Anders kl. 04:04:05 UTC+1 tirsdag 6. januar 2015 skrev Ian Holsman følgende: Thanks for answering Anders, your answers were very helpful. for #1 (starting in a known state). I was more asking how do I start with a 'fresh/clean' journal with no history. I can manually remove the files, but I was hoping there was something in the test-kit I could do which will achieve this so I can do the tests in my IDE. On Mon, Jan 5, 2015 at 6:55 AM, Anders Båtstrand ande...@gmail.com wrote: I have only been using Akka Persistence for a few months, but this is my suggestions: 1. I always start empty, but that is possibly not what you want. Maybe a file based storage, and you copy the files in place right before you start the Akka system? 2. I have solved the same problem by letting the parent actor be responsible for killing the children. That is: The child receives an event that it calculates should result in it's own destruction, and it then calls the parent, which then kills it (and updates a map over actors). In my case the parent is also delegating messages to the child, so this way I am sure not to send messages to any actor that is on the way down (that is, before the parent received the Terminated-message). 3. I am not sure i understood the question. I have CreateChild and KillChild events persistent from the parent actor. During recovery, it makes a map over actors it should create or not, and only creates then onRecoveryComplete. That way I do not create actors I would immediatly kill. Hope this was helpful! Best regards, Anders Båtstrand kl. 19:20:30 UTC+1 fredag 2. januar 2015 skrev Ian Holsman følgende: Hi. I'm building a 'toy' game that makes uses of akka-persistence, in order to learn it. in a nutshell the game is 1000's of people/bots create villages, they act independently of each other, and generally pass messages to each other to chat/fight and all that good stuff. Where I am having problems are: 1. how do I initialize the persistence system at the start of testing so I am in a 'known state' when I start. 2. how I assign IDs (persistanceId) to actors. Ideally I would use a map reference for while the villages exist, and be able to re-use them if a new village is created. but the problem is it finds the old village, and replays all the events (including the death event which kills it). I don't mind re-using a old actor, but i'd like to stop it (ie poisonpill) when it isn't active. It seems like they need to be unique for the life of the system, and to do so I would need a counter in place or GUIDs. but then I would need to keep track of the Map reference/GUID mapping meaning my code would need to first talk to the mapping holder, and then to the village itself (2 messages, 1 sync) which seems like non-ideal. (as opposed to context.actorselection(XYZ map-reference) ! message which I guess is still 2 messages under the covers, 1 sync too) 3. the recovery log only seems to 'start' when I bring up the actor. Is there a way to know which actors are currently 'active' and just let Akka start them all up, instead of me keeping a list/map of 'active' villages, and manually restarting them. Thanks .. Ian. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To
Re: [akka-user] Reproducing akka.remote.OversizedPayloadException locally
Hi, It is not possible to simulate OversizedPayloadException in test using local actor system. You have to start two remote/cluster actor systems and send over the configured port. The send-buffer-size and receive-buffer-size attempt to inform the OS how much buffering it should do on our behalf. If you set very large numbers here you will probably see adverse effects (increased latency, heartbeats being missed because our internal prioritization is less effective, etc.), and if you set very small numbers you will experience lower overall throughput. Normally this does not require tuning. They do not have to correlate with maximum-frame-size. Regards, Patrik On Sun, Jan 4, 2015 at 12:56 AM, Maksym Besida maks.bes...@gmail.com wrote: A few days ago I stumbled across *akka.remote.OversizedPayloadException*. Successfully I found the solution in this user list: the problem was in *akka.remote.netty.tcp.maximum-frame-size* config parameter which is set to128000 bytes by default. My question is how can I reproduce this exception locally, because I tried to send messages with size exceeding this value in the tests but didn't get this exception, only when messages are passing between application instances(seperate JVMs). Also another question is about *akka.remote.netty.tcp.**receive-buffer-size *and *akka.remote.netty.tcp.**send-buffer-size. *Do they have to correlate with *maximum-frame-size *as in default config? What are the consequences if they don't correlate? Thanks in advance! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Customizing failure-zones in distributed publish subscribe
Hi Muthu, On Mon, Jan 5, 2015 at 7:00 AM, Muthukumaran Kothandaraman muthu.kmk2...@gmail.com wrote: I have following requirements for designing a clusterwide messaging system 1. There can be many to many relationship between topics and subscribers. ie. one module (OSGi) in each cluster node can subscribe to more than one topic and each topic can be subscribed to by more than one module. So, I have created one subscriber actor per-module per-topic combination to avoid issues like head-of-line-blocking and to fairly distribute the processing across multiple actors 2. Subscriber actors likely to invoke some legacy code which may not be very finely tuned for latency and hence there could be different processing-latencies for the same message of same topic by different modules 3. Publish is invoked directly in the caller thread where message originates (ie. from within existing code which is not actor-based). Since 'tell' to mediator is non-blocking, I am assuming that this should be fine (pls correct me if I am making any wrong assumptions here) Yes, that is fine My topics-module combinations lead to upto max 100 actors running in each node of cluster. My thoughts on creating failure zone is now limited to only the subscribers part because I create only subscriber actors and publishing entirely happens in caller-thread. So, what are possible ways to create failure-zones a. topic-wise and b. topic+subscriber combinationwise (somehow, this appears to be a more complex part) What do you mean by failure-zones? What do you want to achieve or protect against? Regards, Patrik Any experiences and/or recommendations ? Thanks in advance - Muthu -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Unordered merge of 2..n streams created by groupBy
Further studying convinces me that this is not yet implemented. I'm going to pursue the ActorPublisher / ActorSubscriber route; thank you for this documentation page http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-integrations.html, Akka-Stream folks! I'll post my solution when done, in the case someone else needs this as well. On Tuesday, January 6, 2015 11:39:22 AM UTC-7, Tim Harper wrote: I am modeling a data processing pipeline where there are several decision points, where the system may refer to a human being to make a decision. Since earlier decisions change the state of the system and affect later decisions, another goal of mine is to process a narrow sliding view of the data and have go not too much faster than humans (another goal: since these decisions potentially take time and it may need to sort through several items that the machine decides it can handle, I want the system to buffer user-decisions so work requests are ready for them when they ask for them, and then, naturally, back-pressure when the buffer is full). In summary, my goal is to be able to merge streams in an unordered fashion. *Lengthy pseudo-code* follows for clarification: val source = Source(docsToHandle). mapAsync(withCurrentHeadVersion). groupByAsync(attachDecisionDecider.decide). map[Source[Either[Terminal, DocForProcessing]]] { case (AttachDecisionDecider.AlreadyAttached, flow) = flow map { doc = Right(doc) } case (AttachDecisionDecider.ObviousDuplicate, flow) = flow.mapAsync { doc = (entityState ? EntityState.AttachDoc(doc.headEntityVersion, doc.path)).mapTo[Option[Int]] map { case None = // conflict! Note: we may want to consider a max-retry signal that dumps to another table when abandoning; this can be handled by our requeue logic down the stream Left(ProcessConflict(doc.path)) case Some(version) = Right(doc.copy(headEntityVersion = Some(version))) // Note - it may actually be best to requery this later, to tighten the gap between decision and acting on the decision. } } case (AttachDecisionDecider.HumanDecisionNeeded, flow) = flow. map(referToHuman(attach, _)). buffer(20, OverflowStrategy.backpressure). // enque up to 20 unacked human requests mapAsyncUnordered(identity). // at least one human has begun the work (on abandon, future will be abandoned) mapAsync(identity) // human has completed the work }. mergeUnordered. mapConcat(handleResult). groupByAsync(pickDecisionDecider.decide). map[Source[Either[Terminal, DocForProcessing]]] { case (PickDecisionDecider.AlreadyPicked, flow) = flow map { doc = Right(doc) } case (PickDecisionDecider.AutoMerge, flow) = flow mapAsync { doc = (entityState ? EntityState.PickDoc(doc.headEntityVersion, doc.path)).mapTo[Option[Int]] map { case None = Left(ProcessConflict(doc.path)) case Some(version) = Right(doc.copy(headEntityVersion = Some(version))) } } case (PickDecisionDecider.HumanDecisionNeeded, flow) = flow. map(referToHuman(pick, _)). buffer(20, OverflowStrategy.backpressure). // enque up to 20 unacked human requests mapAsyncUnordered(identity). // at least one human has begun the work (on abandon, future will be abandoned) mapAsync(identity) // human has completed the work }. mergeUnordered. mapConcat(handleResult). foreach { doc = // everything that makes it this far is done; foreach consumer will produce a constant pull on the stream. ack(doc.path) } It looks like Merge unordered has been implemented for FlowGraphs http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.205107984.1839818538.1375706610#akka.stream.scaladsl.Merge, but I don't see a GroupBy junction defined, just Broadcast. So, I'm not sure if I can access this feature. It looks like I might be able to define an ActorPublisher and ActorSubscribe to achieve my goal; IteratorPublisher.scala https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0-M2/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala is fine enough documentation. So, this is my working plan; but wanted to check to see if I was missing some undocumented feature first or overlooking something that has already been done in this area. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this
Re: [akka-user] [akka-streams] Duplicator example from the docs
Ok, if there's only one ball, that answers the question. Thanks! :) Adam On Thursday, January 1, 2015 10:00:27 AM UTC+1, Akka Team wrote: Hi Adam, PushPull stage callbacks are never concurrent. Also, if a stage calls ctx.Push then it will receive eventually an onPull, and whenever it calls ctx.pull, it will receive an onPush eventually (of course completion events can come at any time). I'm wondering if there are any guarantees on the order of calling onPush() and onPull()? For example, that onPush() won't be called twice in succession, but that onPush() and onPull calls are interleaved? Otherwise we may end up not duplicating elements (since we only remember the last one). You can imagine a series of stages like a pipe and a bouncing ball. When receiving onPush() the ball just bounced into that segment of the pipe. If you call ctx.push as a response to this, then the ball just continues downwards. If you call ctx.pull, then it will bounce back upwards. There is only one ball (unless you use DetachedOps but they are an advanced concept). Currently there is no exposed API to fuse multiple operations into such a synchronous pipeline so all stages will run in a separate actor, but that does not change their behavior (a single stage in an actor is actually three stages, two boundary ones that talk to the actor, and your stage sandwiched in-between) so the above mental model works. These stages are completely unbuffered and there is only one activated stage at a given time (in a given synchronous island). I guess this depends if there is any buffering around a custom push-pull-stage, but I would expect it to have a default buffer of awaiting elements? There is never an implicit buffer there, unless you put your own explicit buffering stage. -Endre Adam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Akka Team Typesafe - The software stack for applications that scale Blog: letitcrash.com Twitter: @akkateam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Getting strange error 2.4-Snapshot with stream+http 1.0-M2
Hello, I've got this code: def httpGet( uri:String )(implicit s:ActorSystem) = { implicit val materializer = FlowMaterializer() var r:HttpResponse = null val req = HttpRequest(HttpMethods.GET, Uri(uri)) val host:String = req.uri.authority.host.toString val port:Int = req.uri.effectivePort val httpClient = Http().outgoingConnection(host,port).flow val consumer = Sink.foreach[HttpResponse] { resp ⇒ r = resp } *val finishFuture = Source.single(req).via(httpClient).runWith(consumer)* *// -- GOES BOOM HERE* Await.result(finishFuture, Duration(3 seconds)) // unpack result (r.status.intValue, Await.result(r.entity.toStrict(FiniteDuration(3,seconds)), Duration(3 seconds) ) .data .utf8String) } This works pretty well--for a while. When run in a long-running process, eventually it dies with the exception below. Any idea what's cause it? Thanks for any ideas, Greg Uncaught error from thread [overlord-akka.actor.default-dispatcher-143] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[overlord] java.lang.NoSuchMethodError: akka.pattern.AskableActorRef$.$qmark$extension(Lakka/actor/ActorRef;Ljava/lang/Object;Lakka/util/Timeout;)Lscala/concurrent/Future; at akka.stream.impl.ActorBasedFlowMaterializer.actorOf(ActorBasedFlowMaterializer.scala:471) at akka.stream.impl.ActorBasedFlowMaterializer.actorOf(ActorBasedFlowMaterializer.scala:458) at akka.stream.impl.ActorBasedFlowMaterializer.materializeJunction(ActorBasedFlowMaterializer.scala:506) at akka.stream.scaladsl.FlowGraph$$anonfun$6$$anonfun$apply$8.apply(FlowGraph.scala:1248) at akka.stream.scaladsl.FlowGraph$$anonfun$6$$anonfun$apply$8.apply(FlowGraph.scala:1212) at akka.stream.impl.DirectedGraphBuilder.edgePredecessorBFSfoldLeft(DirectedGraphBuilder.scala:240) at akka.stream.scaladsl.FlowGraph$$anonfun$6.apply(FlowGraph.scala:1212) at akka.stream.scaladsl.FlowGraph$$anonfun$6.apply(FlowGraph.scala:1209) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:105) at akka.stream.scaladsl.FlowGraph.runGraph(FlowGraph.scala:1209) at akka.stream.scaladsl.FlowGraph.run(FlowGraph.scala:1184) at akka.stream.scaladsl.Source$class.runWith(Source.scala:38) at akka.stream.scaladsl.GraphSource.runWith(GraphFlow.scala:128) *at com.rs.overlord.Util$.httpGet(Util.scala:29) -- HIGHLIGHTED LINE IN ABOVE CODE* at com.rs.overlord.OverlordActor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$1.apply$mcV$sp(OverlordActor.scala:32) at com.rs.overlord.OverlordActor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$1.apply(OverlordActor.scala:30) at com.rs.overlord.OverlordActor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$1.apply(OverlordActor.scala:30) at scala.util.Try$.apply(Try.scala:161) at com.rs.overlord.OverlordActor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply(OverlordActor.scala:30) at com.rs.overlord.OverlordActor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply(OverlordActor.scala:29) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at com.rs.overlord.OverlordActor$$anonfun$receive$1.applyOrElse(OverlordActor.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at com.rs.overlord.OverlordActor.aroundReceive(OverlordActor.scala:24) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)2015-01-06 22:33:46.970 ERROR [overlord-akka.actor.default-dispatcher-156] Uncaught error from thread [over -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Akka Cluster Project - Monolithic JAR or JARs per service?
We're also deploying an Akka cluster on CoreOS with Docker. We deploy the same fat JAR for every node (in fact, the exact same docker image) and then change their behavior by setting the role via environment variables. In our case, each role has a bootstrap class which sets up whatever role-specific actors might be needed (but *not* the worker actors) as well as the actors needed on every node. Cluster-aware routers running on our supervisor node are responsible for actually creating the worker actors as needed. We still split each role into sub-projects in our SBT project. We then have an SBT project within that called node which aggregates all of the service-specific sub-projects into a single JAR and builds the Docker image using sbt-docker. This way, we can iterate quickly on code within a specific service and keep our compile/test times down. That node project also houses our multi-JVM end-to-end tests. Then, opposite that, is our commons project which every other project depends on. The service projects explicitly do *not* depend on each other, only commons. This keeps us from getting them too coupled. Since our service isn't directly user-facing, we don't bother trying to upgrade a single service at a time, we just restart the whole thing and let it pull down the new Docker image if needed. Most of this is written up on our blog: http://blog.conspire.com/post/64130417462/akka-at-conspire-part-1-how-we-built-our-backend-on I've been meaning to write about our transition from Chef/Vagrant to CoreOS/Docker but I haven't found the time yet. Hopefully within the next few weeks (which, of course, I said a month ago). On Sunday, January 4, 2015 6:42:21 PM UTC-7, Kane Rogers wrote: Hi, hAkkers! We're in the process of moving our distributed Akka service from the dark ages of remoting and manual management of IPs (shudder) into the wonderful new world of Akka Cluster. Currently, our project is split up something like this: - spray-frontend - worker-1 - worker-2 - worker-3 Where the spray-frontend forwards messages to the different worker, depending on the type of job. In our current environment, each of these projects are deployed as individual fat JARs using sbt-assembly, and deployed onto individual nodes. In our planned environment, we'll be deploying these fat JARs into docker containers and allow CoreOS to take care of distributing the nodes. We're toying with things like roles, ClusterPoolRouter and ClusterGroupRouter to take care of distributing work amongst the correct node - but nothing is set in stone yet. This then begs the question - how should these nodes be deployed? I can see a couple of possibilities: - Docker container with fat JAR per project (eg. spray-frontend container, worker-1 container etc. etc). - Docker container with fat JAR containing all projects (eg. one container containing code for spray-frontend AND worker-1 etc.). Role is then set via environment variable, or a different main class is fired off on startup. Exploring the different options, one limitation that I can see is that ClusterPoolRouter requires the class of the actor that's going to be remotely deployed to the cluster to be *present on the class path of the router. *That is, if our front-ends are to create a worker on a remote machine to handle a request, the class for that router must be in the JAR on the front-end machine. *Please correct me if I'm mistaken here.* The advantage we've found in splitting the project up into these different sub-projects is tests are a lot quicker, code is smaller, etc. etc. Upgrades are also then made easier, as only certain machines have to be upgraded/restarted if a component of the service is improved/fixed. We also have a shared project between the different services that contains the dialect (eg. the different case classes for message sent between services). This was a best practice that we read about when we first went down the Akka path a couple of years ago, but things may have changed since then! Any suggestions, past experience, pointers to articles to read, activator templates or even just general advice would be really appreciated! Thanks and kind regards, Kane -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Application.conf not being used under test
See also -Dconfig.trace=loads (documented at https://github.com/typesafehub/config#debugging-your-configuration ) That should tell you what's happening. Havoc On Tue, Jan 6, 2015 at 8:34 AM, Patrik Nordwall patrik.nordw...@gmail.com wrote: I would guess it is a classpath ordering issue. To be sure you load the right config from tests you can use another file name for the test config name. In the tests you load that config and pass to the factory method of the ActorSystem. /Patrik On Mon, Jan 5, 2015 at 1:45 PM, Anders Båtstrand ander...@gmail.com wrote: How do you load the configuration? And could you post the output of Config.origin() after you have loaded the configuration? I had a problem with configuration being overidden, but solved it by reading the javadoc on ConfigFactor.parseResourcesAnySyntax (cleared up a few things for me). Best regards, Anders Båtstrand kl. 04:32:26 UTC+1 mandag 5. januar 2015 skrev manwood følgende: I have a small Akka application with application.conf files under both test and main directories. Initially, when running the tests, the application.conf file under the test directory was being used (as desired). However, whilst building out the application, adding dependencies etc, at some point the test code has started to use the application.conf under main instead. What could have caused this? How can I establish why? -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Customizing failure-zones in distributed publish subscribe
Thanks Patrik. By failure-zones, I mean to assign different dispatchers to different combinations of topic + subscribers. For example, Subscriber A and Subscriber B both handle messages from topic C and hence there would be two subscribers - actor sub_a_topic_c and actor sub_b_topic_c (I choose a convention to name actors with the combination to distinguish different subscribers for same topic). In above example, I could assign different dispatchers - one each for two subscribers based on latencies of message-handling by subscribers, so that processing latencies by one actor does not affect the other for same topic. Is this approach fine in above context ? Of course the decision to separate dispatchers would be taken only after thorough measurement of latencies - Muthu On Tuesday, 6 January 2015 20:31:54 UTC+5:30, Patrik Nordwall wrote: Hi Muthu, On Mon, Jan 5, 2015 at 7:00 AM, Muthukumaran Kothandaraman muthu@gmail.com javascript: wrote: I have following requirements for designing a clusterwide messaging system 1. There can be many to many relationship between topics and subscribers. ie. one module (OSGi) in each cluster node can subscribe to more than one topic and each topic can be subscribed to by more than one module. So, I have created one subscriber actor per-module per-topic combination to avoid issues like head-of-line-blocking and to fairly distribute the processing across multiple actors 2. Subscriber actors likely to invoke some legacy code which may not be very finely tuned for latency and hence there could be different processing-latencies for the same message of same topic by different modules 3. Publish is invoked directly in the caller thread where message originates (ie. from within existing code which is not actor-based). Since 'tell' to mediator is non-blocking, I am assuming that this should be fine (pls correct me if I am making any wrong assumptions here) Yes, that is fine My topics-module combinations lead to upto max 100 actors running in each node of cluster. My thoughts on creating failure zone is now limited to only the subscribers part because I create only subscriber actors and publishing entirely happens in caller-thread. So, what are possible ways to create failure-zones a. topic-wise and b. topic+subscriber combinationwise (somehow, this appears to be a more complex part) What do you mean by failure-zones? What do you want to achieve or protect against? Regards, Patrik Any experiences and/or recommendations ? Thanks in advance - Muthu -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Unordered merge of 2..n streams created by groupBy
I am modeling a data processing pipeline where there are several decision points, where the system may refer to a human being to make a decision. Since earlier decisions change the state of the system and affect later decisions, another goal of mine is to process a narrow sliding view of the data and have go not too much faster than humans (another goal: since these decisions potentially take time and it may need to sort through several items that the machine decides it can handle, I want the system to buffer user-decisions so work requests are ready for them when they ask for them, and then, naturally, back-pressure when the buffer is full). In summary, my goal is to be able to merge streams in an unordered fashion. *Lengthy pseudo-code* follows for clarification: val source = Source(docsToHandle). mapAsync(withCurrentHeadVersion). groupByAsync(attachDecisionDecider.decide). map[Source[Either[Terminal, DocForProcessing]]] { case (AttachDecisionDecider.AlreadyAttached, flow) = flow map { doc = Right(doc) } case (AttachDecisionDecider.ObviousDuplicate, flow) = flow.mapAsync { doc = (entityState ? EntityState.AttachDoc(doc.headEntityVersion, doc.path)).mapTo[Option[Int]] map { case None = // conflict! Note: we may want to consider a max-retry signal that dumps to another table when abandoning; this can be handled by our requeue logic down the stream Left(ProcessConflict(doc.path)) case Some(version) = Right(doc.copy(headEntityVersion = Some(version))) // Note - it may actually be best to requery this later, to tighten the gap between decision and acting on the decision. } } case (AttachDecisionDecider.HumanDecisionNeeded, flow) = flow. map(referToHuman(attach, _)). buffer(20, OverflowStrategy.backpressure). // enque up to 20 unacked human requests mapAsyncUnordered(identity). // at least one human has begun the work (on abandon, future will be abandoned) mapAsync(identity) // human has completed the work }. mergeUnordered. mapConcat(handleResult). groupByAsync(pickDecisionDecider.decide). map[Source[Either[Terminal, DocForProcessing]]] { case (PickDecisionDecider.AlreadyPicked, flow) = flow map { doc = Right(doc) } case (PickDecisionDecider.AutoMerge, flow) = flow mapAsync { doc = (entityState ? EntityState.PickDoc(doc.headEntityVersion, doc.path)).mapTo[Option[Int]] map { case None = Left(ProcessConflict(doc.path)) case Some(version) = Right(doc.copy(headEntityVersion = Some(version))) } } case (PickDecisionDecider.HumanDecisionNeeded, flow) = flow. map(referToHuman(pick, _)). buffer(20, OverflowStrategy.backpressure). // enque up to 20 unacked human requests mapAsyncUnordered(identity). // at least one human has begun the work (on abandon, future will be abandoned) mapAsync(identity) // human has completed the work }. mergeUnordered. mapConcat(handleResult). foreach { doc = // everything that makes it this far is done; foreach consumer will produce a constant pull on the stream. ack(doc.path) } It looks like Merge unordered has been implemented for FlowGraphs http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.205107984.1839818538.1375706610#akka.stream.scaladsl.Merge, but I don't see a GroupBy junction defined, just Broadcast. So, I'm not sure if I can access this feature. It looks like I might be able to define an ActorPublisher and ActorSubscribe to achieve my goal; IteratorPublisher.scala https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0-M2/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala is fine enough documentation. So, this is my working plan; but wanted to check to see if I was missing some undocumented feature first or overlooking something that has already been done in this area. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.