Repository: curator Updated Branches: refs/heads/CURATOR-3.0 1a240520f -> ac5c13268
First pass implementation of group membership Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3029856c Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3029856c Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3029856c Branch: refs/heads/CURATOR-3.0 Commit: 3029856c894a72ed2c5762e0c6acd2c0a8cd3937 Parents: f8f05be Author: randgalt <randg...@apache.org> Authored: Sun Sep 27 16:59:48 2015 -0500 Committer: randgalt <randg...@apache.org> Committed: Sun Sep 27 16:59:48 2015 -0500 ---------------------------------------------------------------------- .../framework/CuratorFrameworkFactory.java | 31 ++-- .../framework/recipes/nodes/GroupMember.java | 140 +++++++++++++++++++ .../src/site/confluence/group-member.confluence | 42 ++++++ .../src/site/confluence/index.confluence | 3 +- .../recipes/nodes/TestGroupMember.java | 79 +++++++++++ 5 files changed, 281 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index dcb2ee6..41ff9cd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -100,6 +100,24 @@ public class CuratorFrameworkFactory build(); } + /** + * Return the local address as bytes that can be used as a node payload + * + * @return local address bytes + */ + public static byte[] getLocalAddress() + { + try + { + return InetAddress.getLocalHost().getHostAddress().getBytes(); + } + catch ( UnknownHostException ignore ) + { + // ignore + } + return new byte[0]; + } + public static class Builder { private EnsembleProvider ensembleProvider; @@ -465,19 +483,6 @@ public class CuratorFrameworkFactory } } - private static byte[] getLocalAddress() - { - try - { - return InetAddress.getLocalHost().getHostAddress().getBytes(); - } - catch ( UnknownHostException ignore ) - { - // ignore - } - return new byte[0]; - } - private CuratorFrameworkFactory() { } http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java new file mode 100644 index 0000000..5aa8ca2 --- /dev/null +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.utils.ZKPaths; +import java.io.Closeable; +import java.util.Arrays; +import java.util.Map; + +/** + * Group membership management. Adds this instance into a group and + * keeps a cache of members in the group + */ +public class GroupMember implements Closeable +{ + private final PersistentEphemeralNode pen; + private final PathChildrenCache cache; + private final String thisId; + private final byte[] payload; + + /** + * @param client client + * @param membershipPath the path to use for membership + * @param thisId ID of this group member. MUST be unique for the group + */ + public GroupMember(CuratorFramework client, String membershipPath, String thisId) + { + this(client, membershipPath, thisId, CuratorFrameworkFactory.getLocalAddress()); + } + + /** + * @param client client + * @param membershipPath the path to use for membership + * @param thisId ID of this group member. MUST be unique for the group + * @param payload the payload to write in our member node + */ + public GroupMember(CuratorFramework client, String membershipPath, String thisId, byte[] payload) + { + this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be null"); + this.payload = Arrays.copyOf(payload, payload.length); + + cache = newPathChildrenCache(client, membershipPath); + pen = newPersistentEphemeralNode(client, membershipPath, thisId, payload); + } + + /** + * Start the group membership. Register thisId as a member and begin + * caching all members + */ + public void start() + { + pen.start(); + try + { + cache.start(); + } + catch ( Exception e ) + { + Throwables.propagate(e); + } + } + + /** + * Have thisId leave the group and stop caching membership + */ + @Override + public void close() + { + CloseableUtils.closeQuietly(cache); + CloseableUtils.closeQuietly(pen); + } + + /** + * Return the current view of membership. The keys are the IDs + * of the members. The values are each member's payload + * + * @return membership + */ + public Map<String, byte[]> getCurrentMembers() + { + ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder(); + boolean thisIdAdded = false; + for ( ChildData data : cache.getCurrentData() ) + { + String id = idFromPath(data.getPath()); + thisIdAdded = thisIdAdded || id.equals(thisId); + builder.put(id, data.getData()); + } + if ( !thisIdAdded ) + { + builder.put(thisId, payload); // this instance is always a member + } + return builder.build(); + } + + /** + * Given a full ZNode path, return the member ID + * + * @param path full ZNode path + * @return id + */ + public String idFromPath(String path) + { + return ZKPaths.getNodeFromPath(path); + } + + protected PersistentEphemeralNode newPersistentEphemeralNode(CuratorFramework client, String membershipPath, String thisId, byte[] payload) + { + return new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, thisId), payload); + } + + protected PathChildrenCache newPathChildrenCache(CuratorFramework client, String membershipPath) + { + return new PathChildrenCache(client, membershipPath, true); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/site/confluence/group-member.confluence ---------------------------------------------------------------------- diff --git a/curator-recipes/src/site/confluence/group-member.confluence b/curator-recipes/src/site/confluence/group-member.confluence new file mode 100644 index 0000000..dcb27b3 --- /dev/null +++ b/curator-recipes/src/site/confluence/group-member.confluence @@ -0,0 +1,42 @@ +h1. Group Member + +h2. Description +Group membership management. Adds this instance into a group and keeps a cache of members in the group. + +h2. Participating Classes +* GroupMember + +h2. Usage +h3. Creating a GroupMember +{code} +public GroupMember(CuratorFramework client, + String membershipPath, + String thisId, + byte[] payload) +Parameters: +client - client instance +membershipPath - the path to use for membership +thisId - ID of this group member. MUST be unique for the group +payload - the payload to write in our member node +{code} + +h3. General Usage +GroupMember must be started: +{code} +group.start(); +{code} + +When you are through with the GroupMember instance, you should call close: +{code} +group.close(); +{code} + +NOTE: this will remove the instance from the group + +You can get a current view of the members by calling: +{code} +group.getCurrentMembers(); +{code} + +h2. Error Handling +GroupMember instances internally handle all error states recreating the node as necessary. http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/site/confluence/index.confluence ---------------------------------------------------------------------- diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence index 4f3a032..71de8df 100644 --- a/curator-recipes/src/site/confluence/index.confluence +++ b/curator-recipes/src/site/confluence/index.confluence @@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths". |[[Tree Cache|tree-cache.html]] \- A utility that attempts to keep all data from all children of a ZK path locally cached. This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can register a listener that will get notified when changes occur.| ||Nodes|| -|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions..| +|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.| +|[Group Member|group-member-node.html]] \- Group membership management. Adds this instance into a group and keeps a cache of members in the group.| ||Queues|| |[[Distributed Queue|distributed-queue.html]] \- An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK's PERSISTENT_SEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.| http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java new file mode 100644 index 0000000..3d4b951 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.nodes; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.BaseClassForTests; +import org.apache.curator.test.Timing; +import org.apache.curator.utils.CloseableUtils; +import org.testng.Assert; +import org.testng.annotations.Test; +import java.util.Map; + +public class TestGroupMember extends BaseClassForTests +{ + // NOTE - don't need many tests as this class is just a wrapper around two existing recipes + + @Test + public void testBasic() throws Exception + { + Timing timing = new Timing(); + GroupMember groupMember1 = null; + GroupMember groupMember2 = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + try + { + client.start(); + + groupMember1 = new GroupMember(client, "/member", "1"); + Assert.assertTrue(groupMember1.getCurrentMembers().containsKey("1")); + groupMember1.start(); + + groupMember2 = new GroupMember(client, "/member", "2"); + groupMember2.start(); + + timing.sleepABit(); + + Map<String, byte[]> currentMembers1 = groupMember1.getCurrentMembers(); + Map<String, byte[]> currentMembers2 = groupMember2.getCurrentMembers(); + Assert.assertEquals(currentMembers1.size(), 2); + Assert.assertEquals(currentMembers2.size(), 2); + Assert.assertEquals(currentMembers1, currentMembers2); + Assert.assertTrue(currentMembers1.containsKey("1")); + Assert.assertTrue(currentMembers1.containsKey("2")); + + groupMember2.close(); + + timing.sleepABit(); + + currentMembers1 = groupMember1.getCurrentMembers(); + Assert.assertEquals(currentMembers1.size(), 1); + Assert.assertTrue(currentMembers1.containsKey("1")); + Assert.assertFalse(currentMembers1.containsKey("2")); + } + finally + { + CloseableUtils.closeQuietly(groupMember1); + CloseableUtils.closeQuietly(groupMember2); + CloseableUtils.closeQuietly(client); + } + } +}