Repository: curator
Updated Branches:
  refs/heads/CURATOR-3.0 1a240520f -> ac5c13268


First pass implementation of group membership


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/3029856c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/3029856c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/3029856c

Branch: refs/heads/CURATOR-3.0
Commit: 3029856c894a72ed2c5762e0c6acd2c0a8cd3937
Parents: f8f05be
Author: randgalt <randg...@apache.org>
Authored: Sun Sep 27 16:59:48 2015 -0500
Committer: randgalt <randg...@apache.org>
Committed: Sun Sep 27 16:59:48 2015 -0500

----------------------------------------------------------------------
 .../framework/CuratorFrameworkFactory.java      |  31 ++--
 .../framework/recipes/nodes/GroupMember.java    | 140 +++++++++++++++++++
 .../src/site/confluence/group-member.confluence |  42 ++++++
 .../src/site/confluence/index.confluence        |   3 +-
 .../recipes/nodes/TestGroupMember.java          |  79 +++++++++++
 5 files changed, 281 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index dcb2ee6..41ff9cd 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -100,6 +100,24 @@ public class CuratorFrameworkFactory
             build();
     }
 
+    /**
+     * Return the local address as bytes that can be used as a node payload
+     *
+     * @return local address bytes
+     */
+    public static byte[] getLocalAddress()
+    {
+        try
+        {
+            return InetAddress.getLocalHost().getHostAddress().getBytes();
+        }
+        catch ( UnknownHostException ignore )
+        {
+            // ignore
+        }
+        return new byte[0];
+    }
+
     public static class Builder
     {
         private EnsembleProvider ensembleProvider;
@@ -465,19 +483,6 @@ public class CuratorFrameworkFactory
         }
     }
 
-    private static byte[] getLocalAddress()
-    {
-        try
-        {
-            return InetAddress.getLocalHost().getHostAddress().getBytes();
-        }
-        catch ( UnknownHostException ignore )
-        {
-            // ignore
-        }
-        return new byte[0];
-    }
-
     private CuratorFrameworkFactory()
     {
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
new file mode 100644
index 0000000..5aa8ca2
--- /dev/null
+++ 
b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/GroupMember.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.nodes;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZKPaths;
+import java.io.Closeable;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Group membership management. Adds this instance into a group and
+ * keeps a cache of members in the group
+ */
+public class GroupMember implements Closeable
+{
+    private final PersistentEphemeralNode pen;
+    private final PathChildrenCache cache;
+    private final String thisId;
+    private final byte[] payload;
+
+    /**
+     * @param client client
+     * @param membershipPath the path to use for membership
+     * @param thisId ID of this group member. MUST be unique for the group
+     */
+    public GroupMember(CuratorFramework client, String membershipPath, String 
thisId)
+    {
+        this(client, membershipPath, thisId, 
CuratorFrameworkFactory.getLocalAddress());
+    }
+
+    /**
+     * @param client client
+     * @param membershipPath the path to use for membership
+     * @param thisId ID of this group member. MUST be unique for the group
+     * @param payload the payload to write in our member node
+     */
+    public GroupMember(CuratorFramework client, String membershipPath, String 
thisId, byte[] payload)
+    {
+        this.thisId = Preconditions.checkNotNull(thisId, "thisId cannot be 
null");
+        this.payload = Arrays.copyOf(payload, payload.length);
+
+        cache = newPathChildrenCache(client, membershipPath);
+        pen = newPersistentEphemeralNode(client, membershipPath, thisId, 
payload);
+    }
+
+    /**
+     * Start the group membership. Register thisId as a member and begin
+     * caching all members
+     */
+    public void start()
+    {
+        pen.start();
+        try
+        {
+            cache.start();
+        }
+        catch ( Exception e )
+        {
+            Throwables.propagate(e);
+        }
+    }
+
+    /**
+     * Have thisId leave the group and stop caching membership
+     */
+    @Override
+    public void close()
+    {
+        CloseableUtils.closeQuietly(cache);
+        CloseableUtils.closeQuietly(pen);
+    }
+
+    /**
+     * Return the current view of membership. The keys are the IDs
+     * of the members. The values are each member's payload
+     *
+     * @return membership
+     */
+    public Map<String, byte[]> getCurrentMembers()
+    {
+        ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
+        boolean thisIdAdded = false;
+        for ( ChildData data : cache.getCurrentData() )
+        {
+            String id = idFromPath(data.getPath());
+            thisIdAdded = thisIdAdded || id.equals(thisId);
+            builder.put(id, data.getData());
+        }
+        if ( !thisIdAdded )
+        {
+            builder.put(thisId, payload);   // this instance is always a member
+        }
+        return builder.build();
+    }
+
+    /**
+     * Given a full ZNode path, return the member ID
+     *
+     * @param path full ZNode path
+     * @return id
+     */
+    public String idFromPath(String path)
+    {
+        return ZKPaths.getNodeFromPath(path);
+    }
+
+    protected PersistentEphemeralNode 
newPersistentEphemeralNode(CuratorFramework client, String membershipPath, 
String thisId, byte[] payload)
+    {
+        return new PersistentEphemeralNode(client, 
PersistentEphemeralNode.Mode.EPHEMERAL, ZKPaths.makePath(membershipPath, 
thisId), payload);
+    }
+
+    protected PathChildrenCache newPathChildrenCache(CuratorFramework client, 
String membershipPath)
+    {
+        return new PathChildrenCache(client, membershipPath, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/site/confluence/group-member.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/group-member.confluence 
b/curator-recipes/src/site/confluence/group-member.confluence
new file mode 100644
index 0000000..dcb27b3
--- /dev/null
+++ b/curator-recipes/src/site/confluence/group-member.confluence
@@ -0,0 +1,42 @@
+h1. Group Member
+
+h2. Description
+Group membership management. Adds this instance into a group and keeps a cache 
of members in the group.
+
+h2. Participating Classes
+* GroupMember
+
+h2. Usage
+h3. Creating a GroupMember
+{code}
+public GroupMember(CuratorFramework client,
+                        String membershipPath,
+                        String thisId,
+                        byte[] payload)
+Parameters:
+client - client instance
+membershipPath - the path to use for membership
+thisId - ID of this group member. MUST be unique for the group
+payload - the payload to write in our member node
+{code}
+
+h3. General Usage
+GroupMember must be started:
+{code}
+group.start();
+{code}
+
+When you are through with the GroupMember instance, you should call close:
+{code}
+group.close();
+{code}
+
+NOTE: this will remove the instance from the group
+
+You can get a current view of the members by calling:
+{code}
+group.getCurrentMembers();
+{code}
+
+h2. Error Handling
+GroupMember instances internally handle all error states recreating the node 
as necessary.

http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-recipes/src/site/confluence/index.confluence 
b/curator-recipes/src/site/confluence/index.confluence
index 4f3a032..71de8df 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -29,7 +29,8 @@ regarding "Curator Recipes Own Their ZNode/Paths".
 |[[Tree Cache|tree-cache.html]] \- A utility that attempts to keep all data 
from all children of a ZK path locally cached. This class will watch the ZK 
path, respond to update/create/delete events, pull down the data, etc. You can 
register a listener that will get notified when changes occur.|
 
 ||Nodes||
-|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral 
node that attempts to stay present in ZooKeeper, even through connection and 
session interruptions..|
+|[[Persistent Ephemeral Node|persistent-ephemeral-node.html]] \- An ephemeral 
node that attempts to stay present in ZooKeeper, even through connection and 
session interruptions.|
+|[Group Member|group-member-node.html]] \- Group membership management. Adds 
this instance into a group and keeps a cache of members in the group.|
 
 ||Queues||
 |[[Distributed Queue|distributed-queue.html]] \- An implementation of the 
Distributed Queue ZK recipe. Items put into the queue are guaranteed to be 
ordered (by means of ZK's PERSISTENT_SEQUENTIAL node). If a single consumer 
takes items out of the queue, they will be ordered FIFO. If ordering is 
important, use a LeaderSelector to nominate a single consumer.|

http://git-wip-us.apache.org/repos/asf/curator/blob/3029856c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
----------------------------------------------------------------------
diff --git 
a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
new file mode 100644
index 0000000..3d4b951
--- /dev/null
+++ 
b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestGroupMember.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.nodes;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.Map;
+
+public class TestGroupMember extends BaseClassForTests
+{
+    // NOTE - don't need many tests as this class is just a wrapper around two 
existing recipes
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        Timing timing = new Timing();
+        GroupMember groupMember1 = null;
+        GroupMember groupMember2 = null;
+        CuratorFramework client = 
CuratorFrameworkFactory.newClient(server.getConnectString(), new 
RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            groupMember1 = new GroupMember(client, "/member", "1");
+            
Assert.assertTrue(groupMember1.getCurrentMembers().containsKey("1"));
+            groupMember1.start();
+
+            groupMember2 = new GroupMember(client, "/member", "2");
+            groupMember2.start();
+
+            timing.sleepABit();
+
+            Map<String, byte[]> currentMembers1 = 
groupMember1.getCurrentMembers();
+            Map<String, byte[]> currentMembers2 = 
groupMember2.getCurrentMembers();
+            Assert.assertEquals(currentMembers1.size(), 2);
+            Assert.assertEquals(currentMembers2.size(), 2);
+            Assert.assertEquals(currentMembers1, currentMembers2);
+            Assert.assertTrue(currentMembers1.containsKey("1"));
+            Assert.assertTrue(currentMembers1.containsKey("2"));
+
+            groupMember2.close();
+
+            timing.sleepABit();
+
+            currentMembers1 = groupMember1.getCurrentMembers();
+            Assert.assertEquals(currentMembers1.size(), 1);
+            Assert.assertTrue(currentMembers1.containsKey("1"));
+            Assert.assertFalse(currentMembers1.containsKey("2"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(groupMember1);
+            CloseableUtils.closeQuietly(groupMember2);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+}

Reply via email to