http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
new file mode 100644
index 0000000..99a15fe
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
@@ -0,0 +1,708 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.ACL;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.base.ExceptionalSupplier;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.util.BackoffHelper;
+import 
org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * This class exposes methods for joining and monitoring distributed groups.  
The groups this class
+ * monitors are realized as persistent paths in ZooKeeper with ephemeral child 
nodes for
+ * each member of a group.
+ */
+public class Group {
+  private static final Logger LOG = Logger.getLogger(Group.class.getName());
+
+  private static final Supplier<byte[]> NO_MEMBER_DATA = 
Suppliers.ofInstance(null);
+  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
+
+  private final ZooKeeperClient zkClient;
+  private final ImmutableList<ACL> acl;
+  private final String path;
+
+  private final NodeScheme nodeScheme;
+  private final Predicate<String> nodeNameFilter;
+
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a group rooted at the given {@code path}.  Paths must be absolute 
and trailing or
+   * duplicate slashes will be normalized.  For example, all the following 
paths would create a
+   * group at the normalized path /my/distributed/group:
+   * <ul>
+   *   <li>/my/distributed/group
+   *   <li>/my/distributed/group/
+   *   <li>/my/distributed//group
+   * </ul>
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it 
does not already exist
+   * @param path the absolute persistent path that represents this group
+   * @param nodeScheme the scheme that defines how nodes are created
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, 
NodeScheme nodeScheme) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.acl = ImmutableList.copyOf(acl);
+    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
+
+    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
+    nodeNameFilter = new Predicate<String>() {
+      @Override public boolean apply(String nodeName) {
+        return Group.this.nodeScheme.isMember(nodeName);
+      }
+    };
+
+    backoffHelper = new BackoffHelper();
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} 
with a
+   * {@code namePrefix} of 'member_'.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
+    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
+  }
+
+  /**
+   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, 
NodeScheme)} with a
+   * {@link DefaultScheme} using {@code namePrefix}.
+   */
+  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, 
String namePrefix) {
+    this(zkClient, acl, path, new DefaultScheme(namePrefix));
+  }
+
+  public String getMemberPath(String memberId) {
+    return path + "/" + MorePreconditions.checkNotBlank(memberId);
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public String getMemberId(String nodePath) {
+    MorePreconditions.checkNotBlank(nodePath);
+    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
+        "Not a member of this group[%s]: %s", path, nodePath);
+
+    String memberId = StringUtils.substringAfterLast(nodePath, "/");
+    Preconditions.checkArgument(nodeScheme.isMember(memberId),
+        "Not a group member: %s", memberId);
+    return memberId;
+  }
+
+  /**
+   * Returns the current list of group member ids by querying ZooKeeper 
synchronously.
+   *
+   * @return the ids of all the present members of this group
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
+   * @throws KeeperException if there was a problem reading this group's 
member ids
+   * @throws InterruptedException if this thread is interrupted listing the 
group members
+   */
+  public Iterable<String> getMemberIds()
+      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
+    return Iterables.filter(zkClient.get().getChildren(path, false), 
nodeNameFilter);
+  }
+
+  /**
+   * Gets the data for one of this groups members by querying ZooKeeper 
synchronously.
+   *
+   * @param memberId the id of the member whose data to retrieve
+   * @return the data associated with the {@code memberId}
+   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
+   * @throws KeeperException if there was a problem reading this member's data
+   * @throws InterruptedException if this thread is interrupted retrieving the 
member data
+   */
+  public byte[] getMemberData(String memberId)
+      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
+    return zkClient.get().getData(getMemberPath(memberId), false, null);
+  }
+
+  /**
+   * Represents membership in a distributed group.
+   */
+  public interface Membership {
+
+    /**
+     * Returns the persistent ZooKeeper path that represents this group.
+     */
+    String getGroupPath();
+
+    /**
+     * Returns the id (ZooKeeper node name) of this group member.  May change 
over time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberId();
+
+    /**
+     * Returns the full ZooKeeper path to this group member.  May change over 
time if the
+     * ZooKeeper session expires.
+     */
+    String getMemberPath();
+
+    /**
+     * Updates the membership data synchronously using the {@code 
Supplier<byte[]>} passed to
+     * {@link Group#join()}.
+     *
+     * @return the new membership data
+     * @throws UpdateException if there was a problem updating the membership 
data
+     */
+    byte[] updateMemberData() throws UpdateException;
+
+    /**
+     * Cancels group membership by deleting the associated ZooKeeper member 
node.
+     *
+     * @throws JoinException if there is a problem deleting the node
+     */
+    void cancel() throws JoinException;
+  }
+
+  /**
+   * Indicates an error joining a group.
+   */
+  public static class JoinException extends Exception {
+    public JoinException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Indicates an error updating a group member's data.
+   */
+  public static class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, null)}.
+   */
+  public final Membership join() throws JoinException, InterruptedException {
+    return join(NO_MEMBER_DATA, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(memberData, null)}.
+   */
+  public final Membership join(Supplier<byte[]> memberData)
+      throws JoinException, InterruptedException {
+
+    return join(memberData, null);
+  }
+
+  /**
+   * Equivalent to calling {@code join(null, onLoseMembership)}.
+   */
+  public final Membership join(@Nullable final Command onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    return join(NO_MEMBER_DATA, onLoseMembership);
+  }
+
+  /**
+   * Joins this group and returns the resulting Membership when successful.  
Membership will be
+   * automatically cancelled when the current jvm process dies; however the 
returned Membership
+   * object can be used to cancel membership earlier.  Unless
+   * {@link Group.Membership#cancel()} is called the membership will
+   * be maintained by re-establishing it silently in the background.
+   *
+   * <p>Any {@code memberData} given is persisted in the member node in 
ZooKeeper.  If an
+   * {@code onLoseMembership} callback is supplied, it will be notified each 
time this member loses
+   * membership in the group.
+   *
+   * @param memberData a supplier of the data to store in the member node
+   * @param onLoseMembership a callback to notify when membership is lost
+   * @return a Membership object with the member details
+   * @throws JoinException if there was a problem joining the group
+   * @throws InterruptedException if this thread is interrupted awaiting 
completion of the join
+   */
+  public final Membership join(Supplier<byte[]> memberData, @Nullable Command 
onLoseMembership)
+      throws JoinException, InterruptedException {
+
+    Preconditions.checkNotNull(memberData);
+    ensurePersistentGroupPath();
+
+    final ActiveMembership groupJoiner = new ActiveMembership(memberData, 
onLoseMembership);
+    return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, 
JoinException>() {
+      @Override public Membership get() throws JoinException {
+        try {
+          return groupJoiner.join();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Interrupted trying to join group at path: " 
+ path, e);
+        } catch (ZooKeeperConnectionException e) {
+          LOG.log(Level.WARNING, "Temporary error trying to join group at 
path: " + path, e);
+          return null;
+        } catch (KeeperException e) {
+          if (zkClient.shouldRetry(e)) {
+            LOG.log(Level.WARNING, "Temporary error trying to join group at 
path: " + path, e);
+            return null;
+          } else {
+            throw new JoinException("Problem joining partition group at path: 
" + path, e);
+          }
+        }
+      }
+    });
+  }
+
+  private void ensurePersistentGroupPath() throws JoinException, 
InterruptedException {
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
JoinException>() {
+      @Override public Boolean get() throws JoinException {
+        try {
+          ZooKeeperUtils.ensurePath(zkClient, acl, path);
+          return true;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Interrupted trying to ensure group at path: 
" + path, e);
+        } catch (ZooKeeperConnectionException e) {
+          LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", 
e);
+          return false;
+        } catch (KeeperException e) {
+          if (zkClient.shouldRetry(e)) {
+            LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, 
e);
+            return false;
+          } else {
+            throw new JoinException("Problem ensuring group at path: " + path, 
e);
+          }
+        }
+      }
+    });
+  }
+
+  private class ActiveMembership implements Membership {
+    private final Supplier<byte[]> memberData;
+    private final Command onLoseMembership;
+    private String nodePath;
+    private String memberId;
+    private volatile boolean cancelled;
+    private byte[] membershipData;
+
+    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command 
onLoseMembership) {
+      this.memberData = memberData;
+      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : 
onLoseMembership;
+    }
+
+    @Override
+    public String getGroupPath() {
+      return path;
+    }
+
+    @Override
+    public synchronized String getMemberId() {
+      return memberId;
+    }
+
+    @Override
+    public synchronized String getMemberPath() {
+      return nodePath;
+    }
+
+    @Override
+    public synchronized byte[] updateMemberData() throws UpdateException {
+      byte[] membershipData = memberData.get();
+      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
+        try {
+          zkClient.get().setData(nodePath, membershipData, 
ZooKeeperUtils.ANY_VERSION);
+          this.membershipData = membershipData;
+        } catch (KeeperException e) {
+          throw new UpdateException("Problem updating membership data.", e);
+        } catch (InterruptedException e) {
+          throw new UpdateException("Interrupted attempting to update 
membership data.", e);
+        } catch (ZooKeeperConnectionException e) {
+          throw new UpdateException(
+              "Could not connect to the ZooKeeper cluster to update membership 
data.", e);
+        }
+      }
+      return membershipData;
+    }
+
+    @Override
+    public synchronized void cancel() throws JoinException {
+      if (!cancelled) {
+        try {
+          backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
JoinException>() {
+            @Override public Boolean get() throws JoinException {
+              try {
+                zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
+                return true;
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new JoinException("Interrupted trying to cancel 
membership: " + nodePath, e);
+              } catch (ZooKeeperConnectionException e) {
+                LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, 
retrying", e);
+                return false;
+              } catch (NoNodeException e) {
+                LOG.info("Membership already cancelled, node at path: " + 
nodePath +
+                         " has been deleted");
+                return true;
+              } catch (KeeperException e) {
+                if (zkClient.shouldRetry(e)) {
+                  LOG.log(Level.WARNING, "Temporary error cancelling 
membership: " + nodePath, e);
+                  return false;
+                } else {
+                  throw new JoinException("Problem cancelling membership: " + 
nodePath, e);
+                }
+              }
+            }
+          });
+          cancelled = true; // Prevent auto-re-join logic from undoing this 
cancel.
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new JoinException("Problem cancelling membership: " + 
nodePath, e);
+        }
+      }
+    }
+
+    private class CancelledException extends IllegalStateException { /* marker 
*/ }
+
+    synchronized Membership join()
+        throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
+
+      if (cancelled) {
+        throw new CancelledException();
+      }
+
+      if (nodePath == null) {
+        // Re-join if our ephemeral node goes away due to session expiry - 
only needs to be
+        // registered once.
+        zkClient.registerExpirationHandler(new Command() {
+          @Override public void execute() {
+            tryJoin();
+          }
+        });
+      }
+
+      byte[] membershipData = memberData.get();
+      String nodeName = nodeScheme.createName(membershipData);
+      CreateMode createMode = nodeScheme.isSequential()
+          ? CreateMode.EPHEMERAL_SEQUENTIAL
+          : CreateMode.EPHEMERAL;
+      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, 
acl, createMode);
+      memberId = Group.this.getMemberId(nodePath);
+      LOG.info("Set group member ID to " + memberId);
+      this.membershipData = membershipData;
+
+      // Re-join if our ephemeral node goes away due to maliciousness.
+      zkClient.get().exists(nodePath, new Watcher() {
+        @Override public void process(WatchedEvent event) {
+          if (event.getType() == EventType.NodeDeleted) {
+            tryJoin();
+          }
+        }
+      });
+
+      return this;
+    }
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
+        new ExceptionalSupplier<Boolean, InterruptedException>() {
+          @Override public Boolean get() throws InterruptedException {
+            try {
+              join();
+              return true;
+            } catch (CancelledException e) {
+              // Lost a cancel race - that's ok.
+              return true;
+            } catch (ZooKeeperConnectionException e) {
+              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, 
retrying", e);
+              return false;
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.log(Level.WARNING, "Temporary error re-joining group: " + 
path, e);
+                return false;
+              } else {
+                throw new IllegalStateException("Permanent problem re-joining 
group: " + path, e);
+              }
+            }
+          }
+        };
+
+    private synchronized void tryJoin() {
+      onLoseMembership.execute();
+      try {
+        backoffHelper.doUntilSuccess(tryJoin);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-join group: %s, 
giving up", path), e);
+      }
+    }
+  }
+
+  /**
+   * An interface to an object that listens for changes to a group's 
membership.
+   */
+  public interface GroupChangeListener {
+
+    /**
+     * Called whenever group membership changes with the new list of member 
ids.
+     *
+     * @param memberIds the current member ids
+     */
+    void onGroupChange(Iterable<String> memberIds);
+  }
+
+  /**
+   * An interface that dictates the scheme to use for storing and filtering 
nodes that represent
+   * members of a distributed group.
+   */
+  public interface NodeScheme {
+    /**
+     * Determines if a child node is a member of a group by examining the 
node's name.
+     *
+     * @param nodeName the name of a child node found in a group
+     * @return {@code true} if {@code nodeName} identifies a group member in 
this scheme
+     */
+    boolean isMember(String nodeName);
+
+    /**
+     * Generates a node name for the node representing this process in the 
distributed group.
+     *
+     * @param membershipData the data that will be stored in this node
+     * @return the name for the node that will represent this process in the 
group
+     */
+    String createName(byte[] membershipData);
+
+    /**
+     * Indicates whether this scheme needs ephemeral sequential nodes or just 
ephemeral nodes.
+     *
+     * @return {@code true} if this scheme requires sequential node names; 
{@code false} otherwise
+     */
+    boolean isSequential();
+  }
+
+  /**
+   * Indicates an error watching a group.
+   */
+  public static class WatchException extends Exception {
+    public WatchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Watches this group for the lifetime of this jvm process.  This method 
will block until the
+   * current group members are available, notify the {@code 
groupChangeListener} and then return.
+   * All further changes to the group membership will cause notifications on a 
background thread.
+   *
+   * @param groupChangeListener the listener to notify of group membership 
change events
+   * @return A command which, when executed, will stop watching the group.
+   * @throws WatchException if there is a problem generating the 1st group 
membership list
+   * @throws InterruptedException if interrupted waiting to gather the 1st 
group membership list
+   */
+  public final Command watch(final GroupChangeListener groupChangeListener)
+      throws WatchException, InterruptedException {
+    Preconditions.checkNotNull(groupChangeListener);
+
+    try {
+      ensurePersistentGroupPath();
+    } catch (JoinException e) {
+      throw new WatchException("Failed to create group path: " + path, e);
+    }
+
+    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
+    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
WatchException>() {
+      @Override public Boolean get() throws WatchException {
+        try {
+          groupMonitor.watchGroup();
+          return true;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new WatchException("Interrupted trying to watch group at path: 
" + path, e);
+        } catch (ZooKeeperConnectionException e) {
+          LOG.log(Level.WARNING, "Temporary error trying to watch group at 
path: " + path, e);
+          return null;
+        } catch (KeeperException e) {
+          if (zkClient.shouldRetry(e)) {
+            LOG.log(Level.WARNING, "Temporary error trying to watch group at 
path: " + path, e);
+            return null;
+          } else {
+            throw new WatchException("Problem trying to watch group at path: " 
+ path, e);
+          }
+        }
+      }
+    });
+    return new Command() {
+      @Override public void execute() {
+        groupMonitor.stopWatching();
+      }
+    };
+  }
+
+  /**
+   * Helps continuously monitor a group for membership changes.
+   */
+  private class GroupMonitor {
+    private final GroupChangeListener groupChangeListener;
+    private volatile boolean stopped = false;
+    private Set<String> members;
+
+    GroupMonitor(GroupChangeListener groupChangeListener) {
+      this.groupChangeListener = groupChangeListener;
+    }
+
+    private final Watcher groupWatcher = new Watcher() {
+      @Override public final void process(WatchedEvent event) {
+        if (event.getType() == EventType.NodeChildrenChanged) {
+          tryWatchGroup();
+        }
+      }
+    };
+
+    private final ExceptionalSupplier<Boolean, InterruptedException> 
tryWatchGroup =
+        new ExceptionalSupplier<Boolean, InterruptedException>() {
+          @Override public Boolean get() throws InterruptedException {
+            try {
+              watchGroup();
+              return true;
+            } catch (ZooKeeperConnectionException e) {
+              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, 
retrying", e);
+              return false;
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.log(Level.WARNING, "Temporary error re-watching group: " + 
path, e);
+                return false;
+              } else {
+                throw new IllegalStateException("Permanent problem re-watching 
group: " + path, e);
+              }
+            }
+          }
+        };
+
+    private void tryWatchGroup() {
+      if (stopped) {
+        return;
+      }
+
+      try {
+        backoffHelper.doUntilSuccess(tryWatchGroup);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(
+            String.format("Interrupted while trying to re-watch group: %s, 
giving up", path), e);
+      }
+    }
+
+    private void watchGroup()
+        throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
+
+      if (stopped) {
+        return;
+      }
+
+      List<String> children = zkClient.get().getChildren(path, groupWatcher);
+      setMembers(Iterables.filter(children, nodeNameFilter));
+    }
+
+    private void stopWatching() {
+      // TODO(William Farner): Cancel the watch when
+      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
+      LOG.info("Stopping watch on " + this);
+      stopped = true;
+    }
+
+    synchronized void setMembers(Iterable<String> members) {
+      if (stopped) {
+        LOG.info("Suppressing membership update, no longer watching " + this);
+        return;
+      }
+
+      if (this.members == null) {
+        // Reset our watch on the group if session expires - only needs to be 
registered once.
+        zkClient.registerExpirationHandler(new Command() {
+          @Override public void execute() {
+            tryWatchGroup();
+          }
+        });
+      }
+
+      Set<String> membership = ImmutableSet.copyOf(members);
+      if (!membership.equals(this.members)) {
+        groupChangeListener.onGroupChange(members);
+        this.members = membership;
+      }
+    }
+  }
+
+  /**
+   * Default naming scheme implementation. Stores nodes at [given path] + "/" 
+ [given prefix] +
+   * ZooKeeper-generated member ID. For example, if the path is 
"/discovery/servicename", and the
+   * prefix is "member_", the node's full path will look something like
+   * {@code /discovery/servicename/member_0000000007}.
+   */
+  public static class DefaultScheme implements NodeScheme {
+    private final String namePrefix;
+    private final Pattern namePattern;
+
+    /**
+     * Creates a sequential node scheme based on the given node name prefix.
+     *
+     * @param namePrefix the prefix for the names of the member nodes
+     */
+    public DefaultScheme(String namePrefix) {
+      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
+      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + 
"-?[0-9]+$");
+    }
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return namePattern.matcher(nodeName).matches();
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return namePrefix;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return true;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Group " + path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java
new file mode 100644
index 0000000..91ea345
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Ordering;
+import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.UpdateException;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.zookeeper.data.ACL;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * A distributed mechanism for eventually arriving at an evenly partitioned 
space of long values.
+ * A typical usage would have a client on each of several hosts joining a 
logical partition (a
+ * "partition group") that represents some shared work.  Clients could then 
process a subset of a
+ * full body of work by testing any given item of work with their partition 
filter.
+ *
+ * <p>Note that clients must be able to tolerate periods of duplicate 
processing by more than 1
+ * partition as explained in {@link #join()}.
+ *
+ * @author John Sirois
+ */
+public class Partitioner {
+
+  private static final Logger LOG = 
Logger.getLogger(Partitioner.class.getName());
+
+  private volatile int groupSize;
+  private volatile int groupIndex;
+  private final Group group;
+
+  /**
+   * Constructs a representation of a partition group but does not join it.  
Note that the partition
+   * group path will be created as a persistent zookeeper path if it does not 
already exist.
+   *
+   * @param zkClient a client to use for joining the partition group and 
watching its membership
+   * @param acl the acl for this partition group
+   * @param path a zookeeper path that represents the partition group
+   */
+  public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
+    group = new Group(zkClient, acl, path);
+  }
+
+  @VisibleForTesting
+  int getGroupSize() {
+    return groupSize;
+  }
+
+  /**
+   * Represents a slice of a partition group.  The partition is dynamic and 
will adjust its size as
+   * members join and leave its partition group.
+   */
+  public abstract static class Partition implements Predicate<Long>, 
Membership {
+
+    /**
+     * Returns {@code true} if the given {@code value} is a member of this 
partition at this time.
+     */
+    public abstract boolean isMember(long value);
+
+    /**
+     * Gets number of members in the group at this time.
+     *
+     * @return number of members in the ZK group at this time.
+     */
+    public abstract int getNumPartitions();
+
+    /**
+     * Evaluates partition membership based on the given {@code value}'s hash 
code.  If the value
+     * is null it is never a member of a partition.
+     */
+    boolean isMember(Object value) {
+      return (value != null) && isMember(value.hashCode());
+    }
+
+    /**
+     * Equivalent to {@link #isMember(long)} for all non-null values; however 
incurs unboxing
+     * overhead.
+     */
+    @Override
+    public boolean apply(@Nullable Long input) {
+      return (input != null) && isMember(input);
+    }
+  }
+
+  /**
+   * Attempts to join the partition group and claim a slice.  When successful, 
a predicate is
+   * returned that can be used to test whether or not an item belongs to this 
partition.  The
+   * predicate is dynamic such that as the group is further partitioned or 
partitions merge the
+   * predicate will claim a narrower or wider swath of the partition space 
respectively.  Partition
+   * creation and merging is not instantaneous and clients should expect 
independent partitions to
+   * claim ownership of some items when partition membership is in flux.  It 
is only in the steady
+   * state that a client should expect independent partitions to divide the 
partition space evenly
+   * and without overlap.
+   *
+   * <p>TODO(John Sirois): consider adding a version with a global timeout for 
the join operation.
+   *
+   * @return the partition representing the slice of the partition group this 
member can claim
+   * @throws JoinException if there was a problem joining the partition group
+   * @throws InterruptedException if interrupted while waiting to join the 
partition group
+   */
+  public final Partition join() throws JoinException, InterruptedException {
+    final Membership membership = group.join();
+    try {
+      group.watch(createGroupChangeListener(membership));
+    } catch (WatchException e) {
+      membership.cancel();
+      throw new JoinException("Problem establishing watch on group after 
joining it", e);
+    }
+    return new Partition() {
+      @Override public boolean isMember(long value) {
+        return (value % groupSize) == groupIndex;
+      }
+
+      @Override public int getNumPartitions() {
+        return groupSize;
+      }
+
+      @Override public String getGroupPath() {
+        return membership.getGroupPath();
+      }
+
+      @Override public String getMemberId() {
+        return membership.getMemberId();
+      }
+
+      @Override public String getMemberPath() {
+        return membership.getMemberPath();
+      }
+
+      @Override public byte[] updateMemberData() throws UpdateException {
+        return membership.updateMemberData();
+      }
+
+      @Override public void cancel() throws JoinException {
+        membership.cancel();
+      }
+    };
+  }
+
+  @VisibleForTesting GroupChangeListener createGroupChangeListener(final 
Membership membership) {
+    return new GroupChangeListener() {
+      @Override public void onGroupChange(Iterable<String> memberIds) {
+        List<String> members = Ordering.natural().sortedCopy(memberIds);
+        int newSize = members.size();
+        int newIndex = members.indexOf(membership.getMemberId());
+
+        LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
+            membership.getGroupPath(), members, groupSize, groupIndex, 
newSize, newIndex));
+
+        groupSize = newSize;
+        groupIndex = newIndex;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
new file mode 100644
index 0000000..fb578e1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import org.apache.aurora.common.net.pool.DynamicHostSet;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ * A logical set of servers registered in ZooKeeper.  Intended to be used by 
both servers in a
+ * common service and their clients.
+ *
+ * TODO(William Farner): Explore decoupling this from thrift.
+ */
+public interface ServerSet extends DynamicHostSet<ServiceInstance> {
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
+   * @param status the current service status
+   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the 
server set
+   * @deprecated The status field is deprecated. Please use {@link 
#join(InetSocketAddress, Map)}
+   */
+  @Deprecated
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      Status status) throws JoinException, InterruptedException;
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
+   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the 
server set
+   */
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws JoinException, InterruptedException;
+
+  /**
+   * Attempts to join a server set for this logical service group.
+   *
+   * @param endpoint the primary service endpoint
+   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
+   * @param shardId Unique shard identifier for this member of the service.
+   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
+   * @throws JoinException if there was a problem joining the server set
+   * @throws InterruptedException if interrupted while waiting to join the 
server set
+   */
+  EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      int shardId) throws JoinException, InterruptedException;
+
+  /**
+   * A handle to a service endpoint's status data that allows updating it to 
track current events.
+   */
+  public interface EndpointStatus {
+
+    /**
+     * Attempts to update the status of the service endpoint associated with 
this endpoint.  If the
+     * {@code status} is {@link Status#DEAD} then the endpoint will be removed 
from the server set.
+     *
+     * @param status the current status of the endpoint
+     * @throws UpdateException if there was a problem writing the update
+     * @deprecated Support for mutable status is deprecated. Please use {@link 
#leave()}
+     */
+    @Deprecated
+    void update(Status status) throws UpdateException;
+
+    /**
+     * Removes the endpoint from the server set.
+     *
+     * @throws UpdateException if there was a problem leaving the ServerSet.
+     */
+    void leave() throws UpdateException;
+  }
+
+  /**
+   * Indicates an error updating a service's status information.
+   */
+  public static class UpdateException extends Exception {
+    public UpdateException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public UpdateException(String message) {
+      super(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
new file mode 100644
index 0000000..9a33d3e
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -0,0 +1,602 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Sets.SetView;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.gson.Gson;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import org.apache.aurora.common.args.Arg;
+import org.apache.aurora.common.args.CmdLine;
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Function;
+import org.apache.aurora.common.base.Supplier;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.io.CompatibilityCodec;
+import org.apache.aurora.common.io.ThriftCodec;
+import org.apache.aurora.common.util.BackoffHelper;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * ZooKeeper-backed implementation of {@link ServerSet}.
+ */
+public class ServerSetImpl implements ServerSet {
+  private static final Logger LOG = 
Logger.getLogger(ServerSetImpl.class.getName());
+
+  @CmdLine(name = "serverset_encode_json",
+           help = "If true, use JSON for encoding server set information."
+               + " Defaults to true (use JSON).")
+  private static final Arg<Boolean> ENCODE_JSON = Arg.create(true);
+
+  private final ZooKeeperClient zkClient;
+  private final Group group;
+  private final Codec<ServiceInstance> codec;
+  private final BackoffHelper backoffHelper;
+
+  /**
+   * Creates a new ServerSet using open ZooKeeper node ACLs.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
+    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
+  }
+
+  /**
+   * Creates a new ServerSet for the given service {@code path}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param acl the ACL to use for creating the persistent group path if it 
does not already exist
+   * @param path the name-service path of the service to connect to
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String 
path) {
+    this(zkClient, new Group(zkClient, acl, path), createDefaultCodec());
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
+    this(zkClient, group, createDefaultCodec());
+  }
+
+  /**
+   * Creates a new ServerSet using the given service {@code group} and a 
custom {@code codec}.
+   *
+   * @param zkClient the client to use for interactions with ZooKeeper
+   * @param group the server group
+   * @param codec a codec to use for serializing and de-serializing the 
ServiceInstance data to and
+   *     from a byte array
+   */
+  public ServerSetImpl(ZooKeeperClient zkClient, Group group, 
Codec<ServiceInstance> codec) {
+    this.zkClient = checkNotNull(zkClient);
+    this.group = checkNotNull(group);
+    this.codec = checkNotNull(codec);
+
+    // TODO(John Sirois): Inject the helper so that backoff strategy can be 
configurable.
+    backoffHelper = new BackoffHelper();
+  }
+
+  @VisibleForTesting
+  ZooKeeperClient getZkClient() {
+    return zkClient;
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints)
+      throws Group.JoinException, InterruptedException {
+
+    LOG.log(Level.WARNING,
+        "Joining a ServerSet without a shard ID is deprecated and will soon 
break.");
+    return join(endpoint, additionalEndpoints, Optional.<Integer>absent());
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      int shardId) throws Group.JoinException, InterruptedException {
+
+    return join(endpoint, additionalEndpoints, Optional.of(shardId));
+  }
+
+  private EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      Optional<Integer> shardId) throws Group.JoinException, 
InterruptedException {
+
+    checkNotNull(endpoint);
+    checkNotNull(additionalEndpoints);
+
+    final MemberStatus memberStatus =
+        new MemberStatus(endpoint, additionalEndpoints, shardId);
+    Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() {
+      @Override public byte[] get() {
+        return memberStatus.serializeServiceInstance();
+      }
+    };
+    final Group.Membership membership = group.join(serviceInstanceSupplier);
+
+    return new EndpointStatus() {
+      @Override public void update(Status status) throws UpdateException {
+        checkNotNull(status);
+        LOG.warning("This method is deprecated. Please use leave() instead.");
+        if (status == Status.DEAD) {
+          leave();
+        } else {
+          LOG.warning("Status update has been ignored");
+        }
+      }
+
+      @Override public void leave() throws UpdateException {
+        memberStatus.leave(membership);
+      }
+    };
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> additionalEndpoints,
+      Status status) throws Group.JoinException, InterruptedException {
+
+    LOG.warning("This method is deprecated. Please do not specify a status 
field.");
+    if (status != Status.ALIVE) {
+      
LOG.severe("**************************************************************************\n"
+          + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n"
+          + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status
+          + 
"\n**************************************************************************");
+    }
+    return join(endpoint, additionalEndpoints);
+  }
+
+  @Override
+  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
+    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, 
monitor);
+    try {
+      return serverSetWatcher.watch();
+    } catch (Group.WatchException e) {
+      throw new MonitorException("ZooKeeper watch failed.", e);
+    } catch (InterruptedException e) {
+      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
+    }
+  }
+
+  @Override
+  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
+    LOG.warning("This method is deprecated. Please use watch instead.");
+    watch(monitor);
+  }
+
+  private class MemberStatus {
+    private final InetSocketAddress endpoint;
+    private final Map<String, InetSocketAddress> additionalEndpoints;
+    private final Optional<Integer> shardId;
+
+    private MemberStatus(
+        InetSocketAddress endpoint,
+        Map<String, InetSocketAddress> additionalEndpoints,
+        Optional<Integer> shardId) {
+
+      this.endpoint = endpoint;
+      this.additionalEndpoints = additionalEndpoints;
+      this.shardId = shardId;
+    }
+
+    synchronized void leave(Group.Membership membership) throws 
UpdateException {
+      try {
+        membership.cancel();
+      } catch (Group.JoinException e) {
+        throw new UpdateException(
+            "Failed to auto-cancel group membership on transition to DEAD 
status", e);
+      }
+    }
+
+    byte[] serializeServiceInstance() {
+      ServiceInstance serviceInstance = new ServiceInstance(
+          ServerSets.toEndpoint(endpoint),
+          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
+          Status.ALIVE);
+
+      if (shardId.isPresent()) {
+        serviceInstance.setShard(shardId.get());
+      }
+
+      LOG.fine("updating endpoint data to:\n\t" + serviceInstance);
+      try {
+        return ServerSets.serializeServiceInstance(serviceInstance, codec);
+      } catch (IOException e) {
+        throw new IllegalStateException("Unexpected problem serializing thrift 
struct " +
+            serviceInstance + "to a byte[]", e);
+      }
+    }
+  }
+
+  private static class ServiceInstanceFetchException extends RuntimeException {
+    ServiceInstanceFetchException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  private static class ServiceInstanceDeletedException extends 
RuntimeException {
+    ServiceInstanceDeletedException(String path) {
+      super(path);
+    }
+  }
+
+  private class ServerSetWatcher {
+    private final ZooKeeperClient zkClient;
+    private final HostChangeMonitor<ServiceInstance> monitor;
+    @Nullable private ImmutableSet<ServiceInstance> serverSet;
+
+    ServerSetWatcher(ZooKeeperClient zkClient, 
HostChangeMonitor<ServiceInstance> monitor) {
+      this.zkClient = zkClient;
+      this.monitor = monitor;
+    }
+
+    public Command watch() throws Group.WatchException, InterruptedException {
+      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new 
Command() {
+        @Override public void execute() {
+          // Servers may have changed Status while we were disconnected from 
ZooKeeper, check and
+          // re-register our node watches.
+          rebuildServerSet();
+        }
+      });
+
+      try {
+        return group.watch(new Group.GroupChangeListener() {
+          @Override public void onGroupChange(Iterable<String> memberIds) {
+            notifyGroupChange(memberIds);
+          }
+        });
+      } catch (Group.WatchException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      } catch (InterruptedException e) {
+        zkClient.unregister(onExpirationWatcher);
+        throw e;
+      }
+    }
+
+    private ServiceInstance getServiceInstance(final String nodePath) {
+      try {
+        return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() {
+          @Override public ServiceInstance get() {
+            try {
+              byte[] data = zkClient.get().getData(nodePath, false, null);
+              return ServerSets.deserializeServiceInstance(data, codec);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new ServiceInstanceFetchException(
+                  "Interrupted updating service data for: " + nodePath, e);
+            } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+              LOG.log(Level.WARNING,
+                  "Temporary error trying to updating service data for: " + 
nodePath, e);
+              return null;
+            } catch (NoNodeException e) {
+              invalidateNodePath(nodePath);
+              throw new ServiceInstanceDeletedException(nodePath);
+            } catch (KeeperException e) {
+              if (zkClient.shouldRetry(e)) {
+                LOG.log(Level.WARNING,
+                    "Temporary error trying to update service data for: " + 
nodePath, e);
+                return null;
+              } else {
+                throw new ServiceInstanceFetchException(
+                    "Failed to update service data for: " + nodePath, e);
+              }
+            } catch (IOException e) {
+              throw new ServiceInstanceFetchException(
+                  "Failed to deserialize the ServiceInstance data for: " + 
nodePath, e);
+            }
+          }
+        });
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ServiceInstanceFetchException(
+            "Interrupted trying to update service data for: " + nodePath, e);
+      }
+    }
+
+    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
+        CacheBuilder.newBuilder().build(new CacheLoader<String, 
ServiceInstance>() {
+          @Override public ServiceInstance load(String memberId) {
+            return getServiceInstance(group.getMemberPath(memberId));
+          }
+        });
+
+    private void rebuildServerSet() {
+      Set<String> memberIds = 
ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
+      servicesByMemberId.invalidateAll();
+      notifyGroupChange(memberIds);
+    }
+
+    private String invalidateNodePath(String deletedPath) {
+      String memberId = group.getMemberId(deletedPath);
+      servicesByMemberId.invalidate(memberId);
+      return memberId;
+    }
+
+    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
+        new Function<String, ServiceInstance>() {
+          @Override public ServiceInstance apply(String memberId) {
+            // This get will trigger a fetch
+            try {
+              return servicesByMemberId.getUnchecked(memberId);
+            } catch (UncheckedExecutionException e) {
+              Throwable cause = e.getCause();
+              if (!(cause instanceof ServiceInstanceDeletedException)) {
+                Throwables.propagateIfInstanceOf(cause, 
ServiceInstanceFetchException.class);
+                throw new IllegalStateException(
+                    "Unexpected error fetching member data for: " + memberId, 
e);
+              }
+              return null;
+            }
+          }
+        };
+
+    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
+      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
+      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
+
+      // Ignore no-op state changes except for the 1st when we've seen no 
group yet.
+      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
+        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, 
newMemberIds);
+        // Implicit removal from servicesByMemberId.
+        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
+
+        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
+            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), 
Predicates.notNull());
+
+        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
+      }
+    }
+
+    private void notifyServerSetChange(ImmutableSet<ServiceInstance> 
currentServerSet) {
+      // ZK nodes may have changed if there was a session expiry for a server 
in the server set, but
+      // if the server's status has not changed, we can skip any onChange 
updates.
+      if (!currentServerSet.equals(serverSet)) {
+        if (currentServerSet.isEmpty()) {
+          LOG.warning("server set empty for path " + group.getPath());
+        } else {
+          if (LOG.isLoggable(Level.INFO)) {
+            if (serverSet == null) {
+              LOG.info("received initial membership " + currentServerSet);
+            } else {
+              logChange(Level.INFO, currentServerSet);
+            }
+          }
+        }
+        serverSet = currentServerSet;
+        monitor.onChange(serverSet);
+      }
+    }
+
+    private void logChange(Level level, ImmutableSet<ServiceInstance> 
newServerSet) {
+      StringBuilder message = new StringBuilder("server set " + 
group.getPath() + " change: ");
+      if (serverSet.size() != newServerSet.size()) {
+        message.append("from ").append(serverSet.size())
+            .append(" members to ").append(newServerSet.size());
+      }
+
+      Joiner joiner = Joiner.on("\n\t\t");
+
+      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
+      if (!left.isEmpty()) {
+        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
+      }
+
+      SetView<ServiceInstance> joined = Sets.difference(newServerSet, 
serverSet);
+      if (!joined.isEmpty()) {
+        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
+      }
+
+      LOG.log(level, message.toString());
+    }
+  }
+
+  private static class EndpointSchema {
+    final String host;
+    final Integer port;
+
+    EndpointSchema(Endpoint endpoint) {
+      Preconditions.checkNotNull(endpoint);
+      this.host = endpoint.getHost();
+      this.port = endpoint.getPort();
+    }
+
+    String getHost() {
+      return host;
+    }
+
+    Integer getPort() {
+      return port;
+    }
+  }
+
+  private static class ServiceInstanceSchema {
+    final EndpointSchema serviceEndpoint;
+    final Map<String, EndpointSchema> additionalEndpoints;
+    final Status status;
+    final Integer shard;
+
+    ServiceInstanceSchema(ServiceInstance instance) {
+      this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
+      if (instance.getAdditionalEndpoints() != null) {
+        this.additionalEndpoints = Maps.transformValues(
+            instance.getAdditionalEndpoints(),
+            new Function<Endpoint, EndpointSchema>() {
+              @Override public EndpointSchema apply(Endpoint endpoint) {
+                return new EndpointSchema(endpoint);
+              }
+            }
+        );
+      } else {
+        this.additionalEndpoints = Maps.newHashMap();
+      }
+      this.status  = instance.getStatus();
+      this.shard = instance.isSetShard() ? instance.getShard() : null;
+    }
+
+    EndpointSchema getServiceEndpoint() {
+      return serviceEndpoint;
+    }
+
+    Map<String, EndpointSchema> getAdditionalEndpoints() {
+      return additionalEndpoints;
+    }
+
+    Status getStatus() {
+      return status;
+    }
+
+    Integer getShard() {
+      return shard;
+    }
+  }
+
+  /**
+   * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to 
circumvent the
+   * __isset_bit_vector internal thrift struct field that tracks primitive 
types.
+   */
+  private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
+    private static final Charset ENCODING = Charsets.UTF_8;
+    private static final Class<ServiceInstanceSchema> CLASS = 
ServiceInstanceSchema.class;
+    private final Gson gson = new Gson();
+
+    @Override
+    public void serialize(ServiceInstance instance, OutputStream sink) throws 
IOException {
+      Writer w = new OutputStreamWriter(sink, ENCODING);
+      gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
+      w.flush();
+    }
+
+    @Override
+    public ServiceInstance deserialize(InputStream source) throws IOException {
+      ServiceInstanceSchema output = gson.fromJson(new 
InputStreamReader(source, ENCODING), CLASS);
+      Endpoint primary = new Endpoint(
+          output.getServiceEndpoint().getHost(), 
output.getServiceEndpoint().getPort());
+      Map<String, Endpoint> additional = Maps.transformValues(
+          output.getAdditionalEndpoints(),
+          new Function<EndpointSchema, Endpoint>() {
+            @Override public Endpoint apply(EndpointSchema endpoint) {
+              return new Endpoint(endpoint.getHost(), endpoint.getPort());
+            }
+          }
+      );
+      ServiceInstance instance =
+          new ServiceInstance(primary, ImmutableMap.copyOf(additional), 
output.getStatus());
+      if (output.getShard() != null) {
+        instance.setShard(output.getShard());
+      }
+      return instance;
+    }
+  }
+
+  private static Codec<ServiceInstance> createCodec(final boolean 
useJsonEncoding) {
+    final Codec<ServiceInstance> json = new AdaptedJsonCodec();
+    final Codec<ServiceInstance> thrift =
+        ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL);
+    final Predicate<byte[]> recognizer = new Predicate<byte[]>() {
+      public boolean apply(byte[] input) {
+        return (input.length > 1 && input[0] == '{' && input[1] == '\"') == 
useJsonEncoding;
+      }
+    };
+
+    if (useJsonEncoding) {
+      return CompatibilityCodec.create(json, thrift, 2, recognizer);
+    }
+    return CompatibilityCodec.create(thrift, json, 2, recognizer);
+  }
+
+  /**
+   * Creates a codec for {@link ServiceInstance} objects that uses Thrift 
binary encoding, and can
+   * decode both Thrift and JSON encodings.
+   *
+   * @return a new codec instance.
+   */
+  public static Codec<ServiceInstance> createThriftCodec() {
+    return createCodec(false);
+  }
+
+  /**
+   * Creates a codec for {@link ServiceInstance} objects that uses JSON 
encoding, and can decode
+   * both Thrift and JSON encodings.
+   *
+   * @return a new codec instance.
+   */
+  public static Codec<ServiceInstance> createJsonCodec() {
+    return createCodec(true);
+  }
+
+  /**
+   * Returns a codec for {@link ServiceInstance} objects that uses either the 
Thrift or the JSON
+   * encoding, depending on whether the command line argument 
<tt>serverset_json_encofing</tt> is
+   * set to <tt>true</tt>, and can decode both Thrift and JSON encodings.
+   *
+   * @return a new codec instance.
+   */
+  public static Codec<ServiceInstance> createDefaultCodec() {
+    return createCodec(ENCODE_JSON.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
new file mode 100644
index 0000000..2b99268
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.zookeeper.data.ACL;
+
+import org.apache.aurora.common.base.Function;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * Common ServerSet related functions
+ */
+public class ServerSets {
+
+  private ServerSets() {
+    // Utility class.
+  }
+
+  /**
+   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
+   */
+  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
+      new Function<InetSocketAddress, Endpoint>() {
+        @Override public Endpoint apply(InetSocketAddress address) {
+          return ServerSets.toEndpoint(address);
+        }
+      };
+
+  /**
+   * Creates a server set that registers at a single path applying the given 
ACL to all nodes
+   * created in the path.
+   *
+   * @param zkClient ZooKeeper client to register with.
+   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet 
creates.
+   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, 
java.util.Set)
+   * @return A server set that registers at {@code zkPath}.
+   */
+  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, 
String zkPath) {
+    return create(zkClient, acl, ImmutableSet.of(zkPath));
+  }
+
+  /**
+   * Creates a server set that registers at one or multiple paths applying the 
given ACL to all
+   * nodes created in the paths.
+   *
+   * @param zkClient ZooKeeper client to register with.
+   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet 
creates.
+   * @param zkPaths Paths to register at, must be non-empty.
+   * @return A server set that registers at the given {@code zkPath}s.
+   */
+  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, 
Set<String> zkPaths) {
+    Preconditions.checkNotNull(zkClient);
+    MorePreconditions.checkNotBlank(acl);
+    MorePreconditions.checkNotBlank(zkPaths);
+
+    if (zkPaths.size() == 1) {
+      return new ServerSetImpl(zkClient, acl, 
Iterables.getOnlyElement(zkPaths));
+    } else {
+      ImmutableList.Builder<ServerSet> builder = ImmutableList.builder();
+      for (String path : zkPaths) {
+        builder.add(new ServerSetImpl(zkClient, acl, path));
+      }
+      return new CompoundServerSet(builder.build());
+    }
+  }
+
+  /**
+   * Returns a serialized Thrift service instance object, with given endpoints 
and codec.
+   *
+   * @param serviceInstance the Thrift service instance object to be serialized
+   * @param codec the codec to use to serialize a Thrift service instance 
object
+   * @return byte array that contains a serialized Thrift service instance
+   */
+  public static byte[] serializeServiceInstance(
+      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws 
IOException {
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    codec.serialize(serviceInstance, output);
+    return output.toByteArray();
+  }
+
+  /**
+   * Serializes a service instance based on endpoints.
+   * @see #serializeServiceInstance(ServiceInstance, Codec)
+   *
+   * @param address the target address of the service instance
+   * @param additionalEndpoints additional endpoints of the service instance
+   * @param status service status
+   */
+  public static byte[] serializeServiceInstance(
+      InetSocketAddress address,
+      Map<String, Endpoint> additionalEndpoints,
+      Status status,
+      Codec<ServiceInstance> codec) throws IOException {
+
+    ServiceInstance serviceInstance =
+        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
+    return serializeServiceInstance(serviceInstance, codec);
+  }
+
+  /**
+   * Creates a service instance object deserialized from byte array.
+   *
+   * @param data the byte array contains a serialized Thrift service instance
+   * @param codec the codec to use to deserialize the byte array
+   */
+  public static ServiceInstance deserializeServiceInstance(
+      byte[] data, Codec<ServiceInstance> codec) throws IOException {
+
+    return codec.deserialize(new ByteArrayInputStream(data));
+  }
+
+  /**
+   * Creates an endpoint for the given InetSocketAddress.
+   *
+   * @param address the target address to create the endpoint for
+   */
+  public static Endpoint toEndpoint(InetSocketAddress address) {
+    return new Endpoint(address.getHostName(), address.getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
new file mode 100644
index 0000000..660f3d6
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Candidate.Leader;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * A service that uses master election to only allow a single instance of the 
server to join
+ * the {@link ServerSet} at a time.
+ */
+public class SingletonService {
+  private static final Logger LOG = 
Logger.getLogger(SingletonService.class.getName());
+
+  @VisibleForTesting
+  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
+
+  /**
+   * Creates a candidate that can be combined with an existing server set to 
form a singleton
+   * service using {@link #SingletonService(ServerSet, Candidate)}.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and 
serverset nodes.
+   * @return A candidate that can be housed with a standard server set under a 
single zk path.
+   */
+  public static Candidate createSingletonCandidate(
+      ZooKeeperClient zkClient,
+      String servicePath,
+      Iterable<ACL> acl) {
+
+    return new CandidateImpl(new Group(zkClient, acl, servicePath, 
LEADER_ELECT_NODE_PREFIX));
+  }
+
+  private final ServerSet serverSet;
+  private final Candidate candidate;
+
+  /**
+   * Equivalent to {@link #SingletonService(ZooKeeperClient, String, 
Iterable)} with a default
+   * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
+   */
+  public SingletonService(ZooKeeperClient zkClient, String servicePath) {
+    this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  /**
+   * Creates a new singleton service, identified by {@code servicePath}.  All 
nodes related to the
+   * service (for both leader election and service registration) will live 
under the path and each
+   * node will be created with the supplied {@code acl}. Internally, two 
ZooKeeper {@code Group}s
+   * are used to manage a singleton service - one for leader election, and 
another for the
+   * {@code ServerSet} where the leader's endpoints are registered.  
Leadership election should
+   * guarantee that at most one instance will ever exist in the ServerSet at 
once.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param servicePath The path where service nodes live.
+   * @param acl The acl to apply to newly created candidate nodes and 
serverset nodes.
+   */
+  public SingletonService(ZooKeeperClient zkClient, String servicePath, 
Iterable<ACL> acl) {
+    this(
+        new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)),
+        createSingletonCandidate(zkClient, servicePath, acl));
+  }
+
+  /**
+   * Creates a new singleton service that uses the supplied candidate to vie 
for leadership and then
+   * advertises itself in the given server set once elected.
+   *
+   * @param serverSet The server set to advertise in on election.
+   * @param candidate The candidacy to use to vie for election.
+   */
+  public SingletonService(ServerSet serverSet, Candidate candidate) {
+    this.serverSet = Preconditions.checkNotNull(serverSet);
+    this.candidate = Preconditions.checkNotNull(candidate);
+  }
+
+  /**
+   * Attempts to lead the singleton service.
+   *
+   * @param endpoint The primary endpoint to register as a leader candidate in 
the service.
+   * @param additionalEndpoints Additional endpoints that are available on the 
host.
+   * @param status deprecated, will be ignored entirely
+   * @param listener Handler to call when the candidate is elected or defeated.
+   * @throws Group.WatchException If there was a problem watching the 
ZooKeeper group.
+   * @throws Group.JoinException If there was a problem joining the ZooKeeper 
group.
+   * @throws InterruptedException If the thread watching/joining the group was 
interrupted.
+   * @deprecated The status field is deprecated. Please use
+   *            {@link #lead(InetSocketAddress, Map, LeadershipListener)}
+   */
+  @Deprecated
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final Status status,
+                   final LeadershipListener listener)
+                   throws Group.WatchException, Group.JoinException, 
InterruptedException {
+
+    if (status != Status.ALIVE) {
+      
LOG.severe("******************************************************************************");
+      LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
+      LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + 
status);
+      
LOG.severe("******************************************************************************");
+    } else {
+      
LOG.warning("******************************************************************************");
+      LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
+      LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, 
LeadershipListener)");
+      
LOG.warning("******************************************************************************");
+    }
+
+    lead(endpoint, additionalEndpoints, listener);
+  }
+
+  /**
+   * Attempts to lead the singleton service.
+   *
+   * @param endpoint The primary endpoint to register as a leader candidate in 
the service.
+   * @param additionalEndpoints Additional endpoints that are available on the 
host.
+   * @param listener Handler to call when the candidate is elected or defeated.
+   * @throws Group.WatchException If there was a problem watching the 
ZooKeeper group.
+   * @throws Group.JoinException If there was a problem joining the ZooKeeper 
group.
+   * @throws InterruptedException If the thread watching/joining the group was 
interrupted.
+   */
+  public void lead(final InetSocketAddress endpoint,
+                   final Map<String, InetSocketAddress> additionalEndpoints,
+                   final LeadershipListener listener)
+                   throws Group.WatchException, Group.JoinException, 
InterruptedException {
+
+    Preconditions.checkNotNull(listener);
+
+    candidate.offerLeadership(new Leader() {
+      private ServerSet.EndpointStatus endpointStatus = null;
+      @Override public void onElected(final ExceptionalCommand<JoinException> 
abdicate) {
+        listener.onLeading(new LeaderControl() {
+          ServerSet.EndpointStatus endpointStatus = null;
+          final AtomicBoolean left = new AtomicBoolean(false);
+
+          // Methods are synchronized to prevent simultaneous invocations.
+          @Override public synchronized void advertise()
+              throws JoinException, InterruptedException {
+
+            Preconditions.checkState(!left.get(), "Cannot advertise after 
leaving.");
+            Preconditions.checkState(endpointStatus == null, "Cannot advertise 
more than once.");
+            endpointStatus = serverSet.join(endpoint, additionalEndpoints);
+          }
+
+          @Override public synchronized void leave() throws 
ServerSet.UpdateException, JoinException {
+            Preconditions.checkState(left.compareAndSet(false, true),
+                "Cannot leave more than once.");
+            if (endpointStatus != null) {
+              endpointStatus.leave();
+            }
+            abdicate.execute();
+          }
+        });
+      }
+
+      @Override public void onDefeated() {
+        listener.onDefeated(endpointStatus);
+      }
+    });
+  }
+
+  /**
+   * A listener to be notified of changes in the leadership status.
+   * Implementers should be careful to avoid blocking operations in these 
callbacks.
+   */
+  public interface LeadershipListener {
+
+    /**
+     * Notifies the listener that is is current leader.
+     *
+     * @param control A controller handle to advertise and/or leave advertised 
presence.
+     */
+    public void onLeading(LeaderControl control);
+
+    /**
+     * Notifies the listener that it is no longer leader.  The leader should 
take this opportunity
+     * to remove its advertisement gracefully.
+     *
+     * @param status A handle on the endpoint status for the advertised leader.
+     */
+    public void onDefeated(@Nullable ServerSet.EndpointStatus status);
+  }
+
+  /**
+   * A leadership listener that decorates another listener by automatically 
defeating a
+   * leader that has dropped its connection to ZooKeeper.
+   * Note that the decision to use this over session-based mutual exclusion 
should not be taken
+   * lightly.  Any momentary connection loss due to a flaky network or a 
ZooKeeper server process
+   * exit will cause a leader to abort.
+   */
+  public static class DefeatOnDisconnectLeader implements LeadershipListener {
+
+    private final LeadershipListener wrapped;
+    private Optional<LeaderControl> maybeControl = Optional.absent();
+
+    /**
+     * Creates a new leadership listener that will delegate calls to the 
wrapped listener, and
+     * invoke {@link #onDefeated(ServerSet.EndpointStatus)} if a ZooKeeper 
disconnect is observed while
+     * leading.
+     *
+     * @param zkClient The ZooKeeper client to watch for disconnect events.
+     * @param wrapped The leadership listener to wrap.
+     */
+    public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, 
LeadershipListener wrapped) {
+      this.wrapped = Preconditions.checkNotNull(wrapped);
+
+      zkClient.register(new Watcher() {
+        @Override public void process(WatchedEvent event) {
+          if ((event.getType() == EventType.None)
+              && (event.getState() == KeeperState.Disconnected)) {
+            disconnected();
+          }
+        }
+      });
+    }
+
+    private synchronized void disconnected() {
+      if (maybeControl.isPresent()) {
+        LOG.warning("Disconnected from ZooKeeper while leading, committing 
suicide.");
+        try {
+          wrapped.onDefeated(null);
+          maybeControl.get().leave();
+        } catch (ServerSet.UpdateException e) {
+          LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
+        } catch (JoinException e) {
+          LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
+        } finally {
+          setControl(null);
+        }
+      } else {
+        LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not 
the leader.");
+      }
+    }
+
+    private synchronized void setControl(@Nullable LeaderControl control) {
+      this.maybeControl = Optional.fromNullable(control);
+    }
+
+    @Override public void onLeading(final LeaderControl control) {
+      setControl(control);
+      wrapped.onLeading(new LeaderControl() {
+        @Override public void advertise() throws JoinException, 
InterruptedException {
+          control.advertise();
+        }
+
+        @Override public void leave() throws ServerSet.UpdateException, 
JoinException {
+          setControl(null);
+          control.leave();
+        }
+      });
+    }
+
+    @Override public void onDefeated(@Nullable ServerSet.EndpointStatus 
status) {
+      setControl(null);
+      wrapped.onDefeated(status);
+    }
+  }
+
+  /**
+   * A controller for the state of the leader.  This will be provided to the 
leader upon election,
+   * which allows the leader to decide when to advertise in the underlying 
{@link ServerSet} and
+   * terminate leadership at will.
+   */
+  public interface LeaderControl {
+
+    /**
+     * Advertises the leader's server presence to clients.
+     *
+     * @throws JoinException If there was an error advertising.
+     * @throws InterruptedException If interrupted while advertising.
+     */
+    void advertise() throws JoinException, InterruptedException;
+
+    /**
+     * Leaves candidacy for leadership, removing advertised server presence if 
applicable.
+     *
+     * @throws ServerSet.UpdateException If the leader's status could not be 
updated.
+     * @throws JoinException If there was an error abdicating from leader 
election.
+     */
+    void leave() throws ServerSet.UpdateException, JoinException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java
new file mode 100644
index 0000000..99c290e
--- /dev/null
+++ 
b/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * A server set that represents a fixed set of hosts.
+ * This may be composed under {@link CompoundServerSet} to ensure a minimum 
set of hosts is
+ * present.
+ * A static server set does not support joining, but will allow normal join 
calls and status update
+ * calls to be made.
+ */
+public class StaticServerSet implements ServerSet {
+
+  private static final Logger LOG = 
Logger.getLogger(StaticServerSet.class.getName());
+
+  private static final Function<Endpoint, ServiceInstance> 
ENDPOINT_TO_INSTANCE =
+      new Function<Endpoint, ServiceInstance>() {
+        @Override public ServiceInstance apply(Endpoint endpoint) {
+          return new ServiceInstance(endpoint, ImmutableMap.<String, 
Endpoint>of(), Status.ALIVE);
+        }
+      };
+
+  private final ImmutableSet<ServiceInstance> hosts;
+
+  /**
+   * Creates a static server set that will reply to monitor calls immediately 
and exactly once with
+   * the provided service instances.
+   *
+   * @param hosts Hosts in the static set.
+   */
+  public StaticServerSet(Set<ServiceInstance> hosts) {
+    this.hosts = ImmutableSet.copyOf(hosts);
+  }
+
+  /**
+   * Creates a static server set containing the provided endpoints (and no 
auxiliary ports) which
+   * will all be in the {@link Status#ALIVE} state.
+   *
+   * @param endpoints Endpoints in the static set.
+   * @return A static server set that will advertise the provided endpoints.
+   */
+  public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) {
+    return new StaticServerSet(
+        ImmutableSet.copyOf(Iterables.transform(endpoints, 
ENDPOINT_TO_INSTANCE)));
+  }
+
+  private EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints,
+      Optional<Integer> shardId) {
+
+    LOG.warning("Attempt to join fixed server set ignored.");
+    ServiceInstance joining = new ServiceInstance(
+        ServerSets.toEndpoint(endpoint),
+        Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT),
+        Status.ALIVE);
+    if (shardId.isPresent()) {
+      joining.setShard(shardId.get());
+    }
+    if (!hosts.contains(joining)) {
+      LOG.log(Level.SEVERE,
+          "Joining instance " + joining + " does not match any member of the 
static set.");
+    }
+
+    return new EndpointStatus() {
+      @Override public void leave() throws UpdateException {
+        LOG.warning("Attempt to adjust state of fixed server set ignored.");
+      }
+
+      @Override public void update(Status status) throws UpdateException {
+        LOG.warning("Attempt to adjust state of fixed server set ignored.");
+      }
+    };
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints,
+      Status status) {
+
+    LOG.warning("This method is deprecated. Please do not specify a status 
field.");
+    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints) {
+
+    LOG.warning("Joining a ServerSet without a shard ID is deprecated and will 
soon break.");
+    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
+  }
+
+  @Override
+  public EndpointStatus join(
+      InetSocketAddress endpoint,
+      Map<String, InetSocketAddress> auxEndpoints,
+      int shardId) throws JoinException, InterruptedException {
+
+    return join(endpoint, auxEndpoints, Optional.of(shardId));
+  }
+
+  @Override
+  public Command watch(HostChangeMonitor<ServiceInstance> monitor) {
+    monitor.onChange(hosts);
+    return Commands.NOOP;
+  }
+
+  @Override
+  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
+    watch(monitor);
+  }
+}

Reply via email to