Re: [akka-user] Re: Cluster aware consistent hashing pool - works only on leader

2015-11-20 Thread Patrik Nordwall
On Fri, Nov 20, 2015 at 8:46 AM, Jim Hazen  wrote:

>
>
> On Thursday, November 19, 2015 at 12:32:38 PM UTC-8, Patrik Nordwall wrote:
>>
>>
>>
>> On Thu, Nov 19, 2015 at 8:06 PM, Jim Hazen  wrote:
>>
>>> Wait, what?  So cluster sharding depends on shared mutable state across
>>> your cluster to work?  AFAIknew the independent local nodes managed their
>>> state internally, communicated/coordinated via network protocolling and
>>> delegated to a master when it needed to determine a shard owner the first
>>> time.  All of this allowing for local state, mutated locally with
>>> discovered cluster information.  Is this not the case?  If so, why, this
>>> seems contra to the Akka/Actor model and many other clustering strategies.
>>>
>>
>> That is correct, but the decisions taken by the coordinator must be
>> consistent, also when the coordinator crashes and fails over to another
>> node. Therefore, the state of the new coordinator instance must be
>> recovered to the exact same state as the previous coordinator instance. By
>> default the coordinator is using Akka Persistence to store and recover this
>> state. Distributed Data can be used as an alternative for storing this
>> state.
>>
>>
> The coordinator could broadcast or gossip its state to the cluster as it
> builds it.  Or since the state the coordinator is really managing is the
> node owner of a particular shard, it could simply request this information
> from live cluster participants as part of coordinator election (or rather
> immediately after).  This should rebuild an accurate shard topology.
> Coordination then proceeds normally.
>

That is pretty much what is done in the ddata mode.


>
>>> My product is using a distributed journal, since we also use persistence
>>> along with cluster sharding.  However we're plagued with clustering issues
>>> when rolling new code and ripple restarting nodes in the cluster.  I was
>>> hoping this would go away in 2.4 when I could go back to a local journal
>>> for sharding state.
>>>
>>
>> That will not work, becuase when the coordinator fails over to another
>> node it will not have access to the leveldb journal used by the previous
>> coordinator, and it will then recover to wrong state and you will end up
>> with entity actors with same id running on several nodes. Not an option.
>>
>
> Are you talking about duplicate IDs within the new active cluster, or
> across both brains in case of a split?
>

I'm not talking about a split at all, yet. I'm talking about a crash of the
coordinator node.


> Dupes within a cluster can be prevented from happening if you rebuild the
> coordinator from the remaining nodes.  Once the new coordinator takes over,
> it could easily broadcast a new authoritative ownership table to each
> node.  If the node notices it had running IDs for shards it no longer owns,
> it can shut them down.  This shouldn't happen much if the coordinator built
> its state from the active nodes in the first place.  In the case of a
> split, the active brain doesn't have much control over the IDs on the other
> side.  I needs to rely on a resolution strategy shared by both sides to
> ensure consistency (which is why you have the split brain resolver).
>
>>
>>
>>>  The assumption being that an inter-cluster network partition was
>>> corrupting the shared state on the distributed journal.  Currently the only
>>> way to recover my cluster in these situations is to shut down all nodes,
>>> remove all shard entries from dynamo and restart the cluster nodes, 1 by
>>> 1.  This is on akka 2.3.10.
>>>
>>
>> We have seen two reasons for this issue.
>> 1) Bugs in the journals, e.g. replaying events in wrong order.
>> 2) Split brain scenarios (including network partitions, long GC, and
>> system overload) causing split of the cluster into two separate clusters
>> when using auto-downing. That will result in two coordinators (one in each
>> cluster) writing to the same database and thereby making the event sequence
>> corrupt. We recommend manual downing or Split Brain Resolver
>> 
>> instead of auto-downing.
>>
>> Splits are inevitable and this implementation feels like it amplifies the
> problem.  Once the inevitable happens you corrupt your state because the
> OSS cluster sharding impl lacks a Split Brain Resolver.  What's worse is
> that this situation doesn't appear to be detected immediately.  The split
> silently continues with both coordinators blindly writing state and
> sharding getting more and more corrupt.  But you don't know how bad things
> are until you attempt to recover shard state from the journal, then blamo,
> you end up with no brains.
>
> It would be nice if the OSS product at least supported the static-quorum
> resolver.  Or honored the akka.cluster.min-nr-of-members value when cluster
> 

Re: [akka-user] Re: Cluster aware consistent hashing pool - works only on leader

2015-11-19 Thread Jim Hazen
Wait, what?  So cluster sharding depends on shared mutable state across 
your cluster to work?  AFAIknew the independent local nodes managed their 
state internally, communicated/coordinated via network protocolling and 
delegated to a master when it needed to determine a shard owner the first 
time.  All of this allowing for local state, mutated locally with 
discovered cluster information.  Is this not the case?  If so, why, this 
seems contra to the Akka/Actor model and many other clustering strategies. 

My product is using a distributed journal, since we also use persistence 
along with cluster sharding.  However we're plagued with clustering issues 
when rolling new code and ripple restarting nodes in the cluster.  I was 
hoping this would go away in 2.4 when I could go back to a local journal 
for sharding state.  The assumption being that an inter-cluster network 
partition was corrupting the shared state on the distributed journal. 
 Currently the only way to recover my cluster in these situations is to 
shut down all nodes, remove all shard entries from dynamo and restart the 
cluster nodes, 1 by 1.  This is on akka 2.3.10.

java.lang.IllegalArgumentException: requirement failed: Region 
Actor[akka://User/user/sharding/TokenOAuthState#1273636297] not registered: 
State(Map(-63 -> 
Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], 23 -> 
Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], 40 -> 
Actor[akka.tcp://User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601],
 
33 -> 
Actor[akka.tcp://User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759],
 
50 -> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], -58 -> 
Actor[akka.tcp://User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759],
 
35 -> 
Actor[akka.tcp://User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759],
 
-66 -> 
Actor[akka.tcp://User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759],
 
-23 -> 
Actor[akka.tcp://User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601],
 
-11 -> 
Actor[akka.tcp://User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601]),Map(Actor[akka.tcp://User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759]
 
-> Vector(35, -58, -66, 33), 
Actor[akka://User/user/sharding/TokenOAuthState#-1778736418] -> Vector(-63, 
23, 50), 
Actor[akka.tcp://User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601]
 
-> Vector(-23, 40, -11)),Set())

On Wednesday, November 18, 2015 at 12:23:28 PM UTC-8, Patrik Nordwall wrote:
>
> Leveldb can't be used for cluster sharding, since that is a local journal. 
> The documentation of persistence has links to distributed journals. 

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.


Re: [akka-user] Re: Cluster aware consistent hashing pool - works only on leader

2015-11-19 Thread Patrik Nordwall
On Thu, Nov 19, 2015 at 8:06 PM, Jim Hazen  wrote:

> Wait, what?  So cluster sharding depends on shared mutable state across
> your cluster to work?  AFAIknew the independent local nodes managed their
> state internally, communicated/coordinated via network protocolling and
> delegated to a master when it needed to determine a shard owner the first
> time.  All of this allowing for local state, mutated locally with
> discovered cluster information.  Is this not the case?  If so, why, this
> seems contra to the Akka/Actor model and many other clustering strategies.
>

That is correct, but the decisions taken by the coordinator must be
consistent, also when the coordinator crashes and fails over to another
node. Therefore, the state of the new coordinator instance must be
recovered to the exact same state as the previous coordinator instance. By
default the coordinator is using Akka Persistence to store and recover this
state. Distributed Data can be used as an alternative for storing this
state.


>
> My product is using a distributed journal, since we also use persistence
> along with cluster sharding.  However we're plagued with clustering issues
> when rolling new code and ripple restarting nodes in the cluster.  I was
> hoping this would go away in 2.4 when I could go back to a local journal
> for sharding state.
>

That will not work, becuase when the coordinator fails over to another node
it will not have access to the leveldb journal used by the previous
coordinator, and it will then recover to wrong state and you will end up
with entity actors with same id running on several nodes. Not an option.


>  The assumption being that an inter-cluster network partition was
> corrupting the shared state on the distributed journal.  Currently the only
> way to recover my cluster in these situations is to shut down all nodes,
> remove all shard entries from dynamo and restart the cluster nodes, 1 by
> 1.  This is on akka 2.3.10.
>

We have seen two reasons for this issue.
1) Bugs in the journals, e.g. replaying events in wrong order.
2) Split brain scenarios (including network partitions, long GC, and system
overload) causing split of the cluster into two separate clusters when
using auto-downing. That will result in two coordinators (one in each
cluster) writing to the same database and thereby making the event sequence
corrupt. We recommend manual downing or Split Brain Resolver

instead of auto-downing.

By the way, use latest version, i.e. 2.3.14 or 2.4.0.

Regards,
Patrik


>
> java.lang.IllegalArgumentException: requirement failed: Region
> Actor[akka://User/user/sharding/TokenOAuthState#1273636297] not registered:
> State(Map(-63 ->
> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], 23 ->
> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], 40 ->
> Actor[akka.tcp://
> User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601], 33 ->
> Actor[akka.tcp://
> User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759], 50 ->
> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], -58 ->
> Actor[akka.tcp://
> User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759], 35 ->
> Actor[akka.tcp://
> User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759], -66 ->
> Actor[akka.tcp://
> User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759], -23 ->
> Actor[akka.tcp://
> User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601], -11 ->
> Actor[akka.tcp://
> User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601]
> ),Map(Actor[akka.tcp://
> User@172.31.13.59:8115/user/sharding/TokenOAuthState#-1679280759] ->
> Vector(35, -58, -66, 33),
> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418] -> Vector(-63,
> 23, 50), Actor[akka.tcp://
> User@172.31.13.56:8115/user/sharding/TokenOAuthState#1939420601] ->
> Vector(-23, 40, -11)),Set())
>
> On Wednesday, November 18, 2015 at 12:23:28 PM UTC-8, Patrik Nordwall
> wrote:
>>
>> Leveldb can't be used for cluster sharding, since that is a local
>> journal. The documentation of persistence has links to distributed
>> journals.
>
> --
> >> Read the docs: http://akka.io/docs/
> >> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe  -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>  

Re: [akka-user] Re: Cluster aware consistent hashing pool - works only on leader

2015-11-19 Thread Jim Hazen


On Thursday, November 19, 2015 at 12:32:38 PM UTC-8, Patrik Nordwall wrote:
>
>
>
> On Thu, Nov 19, 2015 at 8:06 PM, Jim Hazen  > wrote:
>
>> Wait, what?  So cluster sharding depends on shared mutable state across 
>> your cluster to work?  AFAIknew the independent local nodes managed their 
>> state internally, communicated/coordinated via network protocolling and 
>> delegated to a master when it needed to determine a shard owner the first 
>> time.  All of this allowing for local state, mutated locally with 
>> discovered cluster information.  Is this not the case?  If so, why, this 
>> seems contra to the Akka/Actor model and many other clustering strategies. 
>>
>
> That is correct, but the decisions taken by the coordinator must be 
> consistent, also when the coordinator crashes and fails over to another 
> node. Therefore, the state of the new coordinator instance must be 
> recovered to the exact same state as the previous coordinator instance. By 
> default the coordinator is using Akka Persistence to store and recover this 
> state. Distributed Data can be used as an alternative for storing this 
> state.
>  
>
The coordinator could broadcast or gossip its state to the cluster as it 
builds it.  Or since the state the coordinator is really managing is the 
node owner of a particular shard, it could simply request this information 
from live cluster participants as part of coordinator election (or rather 
immediately after).  This should rebuild an accurate shard topology. 
 Coordination then proceeds normally.

>
>> My product is using a distributed journal, since we also use persistence 
>> along with cluster sharding.  However we're plagued with clustering issues 
>> when rolling new code and ripple restarting nodes in the cluster.  I was 
>> hoping this would go away in 2.4 when I could go back to a local journal 
>> for sharding state.
>>
>
> That will not work, becuase when the coordinator fails over to another 
> node it will not have access to the leveldb journal used by the previous 
> coordinator, and it will then recover to wrong state and you will end up 
> with entity actors with same id running on several nodes. Not an option.
>

Are you talking about duplicate IDs within the new active cluster, or 
across both brains in case of a split? Dupes within a cluster can be 
prevented from happening if you rebuild the coordinator from the remaining 
nodes.  Once the new coordinator takes over, it could easily broadcast a 
new authoritative ownership table to each node.  If the node notices it had 
running IDs for shards it no longer owns, it can shut them down.  This 
shouldn't happen much if the coordinator built its state from the active 
nodes in the first place.  In the case of a split, the active brain doesn't 
have much control over the IDs on the other side.  I needs to rely on a 
resolution strategy shared by both sides to ensure consistency (which is 
why you have the split brain resolver).

>  
>
>>  The assumption being that an inter-cluster network partition was 
>> corrupting the shared state on the distributed journal.  Currently the only 
>> way to recover my cluster in these situations is to shut down all nodes, 
>> remove all shard entries from dynamo and restart the cluster nodes, 1 by 
>> 1.  This is on akka 2.3.10.
>>
>
> We have seen two reasons for this issue.
> 1) Bugs in the journals, e.g. replaying events in wrong order.
> 2) Split brain scenarios (including network partitions, long GC, and 
> system overload) causing split of the cluster into two separate clusters 
> when using auto-downing. That will result in two coordinators (one in each 
> cluster) writing to the same database and thereby making the event sequence 
> corrupt. We recommend manual downing or Split Brain Resolver 
> 
>  
> instead of auto-downing.
>
> Splits are inevitable and this implementation feels like it amplifies the 
problem.  Once the inevitable happens you corrupt your state because the 
OSS cluster sharding impl lacks a Split Brain Resolver.  What's worse is 
that this situation doesn't appear to be detected immediately.  The split 
silently continues with both coordinators blindly writing state and 
sharding getting more and more corrupt.  But you don't know how bad things 
are until you attempt to recover shard state from the journal, then blamo, 
you end up with no brains.

It would be nice if the OSS product at least supported the static-quorum 
resolver.  Or honored the akka.cluster.min-nr-of-members value when cluster 
membership changes.  It would appear that this value isn't checked if the 
members are already in the UP state and the cluster is simply electing a 
new coordinator.  I was thinking of listening for membership changes myself 
and doing that within my app.

Another advantage of getting 

Re: [akka-user] Re: Cluster aware consistent hashing pool - works only on leader

2015-11-18 Thread Patrik Nordwall
Leveldb can't be used for cluster sharding, since that is a local journal.
The documentation of persistence has links to distributed journals.

An alternative is to use the ddata mode for cluster sharding, then
Distributed Data will be used instead of Persistence for the internal state
of Cluster Sharding.

/Patrik
ons 18 nov. 2015 kl. 19:33 skrev Jim Hazen :

> You don't need your clustered actors to be persistence aware. So you are
> absolutely free to have sharded stateless actors, or manage state in some
> other way.
>
> The confusing part is that the cluster sharding internals requires akka
> persistence to be configured for at least one journal. The internals use
> this journal to persist internal shard state. Without this journal, cluster
> sharding simply silently fails to work (or used to).
>
> Follow the directions for setting up a local leveldb journal. It is well
> supported by akka and easy to set up. You can then stop worrying about
> persistence and just use the sharding functionality.
>
> --
> >>  Read the docs: http://akka.io/docs/
> >>  Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>  Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>  Read the docs: http://akka.io/docs/
>>  Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>  Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.