http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Group.java 
b/commons/src/main/java/com/twitter/common/zookeeper/Group.java
deleted file mode 100644
index 7a32cd0..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/Group.java
+++ /dev/null
@@ -1,708 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.base.ExceptionalSupplier;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.util.BackoffHelper;
-import 
com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-
-/**
- * This class exposes methods for joining and monitoring distributed groups.  
The groups this class
- * monitors are realized as persistent paths in ZooKeeper with ephemeral child 
nodes for
- * each member of a group.
- */
-public class Group {
-  private static final Logger LOG = Logger.getLogger(Group.class.getName());
-
-  private static final Supplier<byte[]> NO_MEMBER_DATA = 
Suppliers.ofInstance(null);
-  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
-
-  private final ZooKeeperClient zkClient;
-  private final ImmutableList<ACL> acl;
-  private final String path;
-
-  private final NodeScheme nodeScheme;
-  private final Predicate<String> nodeNameFilter;
-
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a group rooted at the given {@code path}.  Paths must be absolute 
and trailing or
-   * duplicate slashes will be normalized.  For example, all the following 
paths would create a
-   * group at the normalized path /my/distributed/group:
-   * <ul>
-   *   <li>/my/distributed/group
-   *   <li>/my/distributed/group/
-   *   <li>/my/distributed//group
-   * </ul>
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it 
does not already exist
-   * @param path the absolute persistent path that represents this group
-   * @param nodeScheme the scheme that defines how nodes are created
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, 
NodeScheme nodeScheme) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.acl = ImmutableList.copyOf(acl);
-    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
-
-    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
-    nodeNameFilter = new Predicate<String>() {
-      @Override public boolean apply(String nodeName) {
-        return Group.this.nodeScheme.isMember(nodeName);
-      }
-    };
-
-    backoffHelper = new BackoffHelper();
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} 
with a
-   * {@code namePrefix} of 'member_'.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, 
NodeScheme)} with a
-   * {@link DefaultScheme} using {@code namePrefix}.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, 
String namePrefix) {
-    this(zkClient, acl, path, new DefaultScheme(namePrefix));
-  }
-
-  public String getMemberPath(String memberId) {
-    return path + "/" + MorePreconditions.checkNotBlank(memberId);
-  }
-
-  public String getPath() {
-    return path;
-  }
-
-  public String getMemberId(String nodePath) {
-    MorePreconditions.checkNotBlank(nodePath);
-    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
-        "Not a member of this group[%s]: %s", path, nodePath);
-
-    String memberId = StringUtils.substringAfterLast(nodePath, "/");
-    Preconditions.checkArgument(nodeScheme.isMember(memberId),
-        "Not a group member: %s", memberId);
-    return memberId;
-  }
-
-  /**
-   * Returns the current list of group member ids by querying ZooKeeper 
synchronously.
-   *
-   * @return the ids of all the present members of this group
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
-   * @throws KeeperException if there was a problem reading this group's 
member ids
-   * @throws InterruptedException if this thread is interrupted listing the 
group members
-   */
-  public Iterable<String> getMemberIds()
-      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-    return Iterables.filter(zkClient.get().getChildren(path, false), 
nodeNameFilter);
-  }
-
-  /**
-   * Gets the data for one of this groups members by querying ZooKeeper 
synchronously.
-   *
-   * @param memberId the id of the member whose data to retrieve
-   * @return the data associated with the {@code memberId}
-   * @throws ZooKeeperConnectionException if there was a problem connecting to 
ZooKeeper
-   * @throws KeeperException if there was a problem reading this member's data
-   * @throws InterruptedException if this thread is interrupted retrieving the 
member data
-   */
-  public byte[] getMemberData(String memberId)
-      throws ZooKeeperConnectionException, KeeperException, 
InterruptedException {
-    return zkClient.get().getData(getMemberPath(memberId), false, null);
-  }
-
-  /**
-   * Represents membership in a distributed group.
-   */
-  public interface Membership {
-
-    /**
-     * Returns the persistent ZooKeeper path that represents this group.
-     */
-    String getGroupPath();
-
-    /**
-     * Returns the id (ZooKeeper node name) of this group member.  May change 
over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberId();
-
-    /**
-     * Returns the full ZooKeeper path to this group member.  May change over 
time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberPath();
-
-    /**
-     * Updates the membership data synchronously using the {@code 
Supplier<byte[]>} passed to
-     * {@link Group#join()}.
-     *
-     * @return the new membership data
-     * @throws UpdateException if there was a problem updating the membership 
data
-     */
-    byte[] updateMemberData() throws UpdateException;
-
-    /**
-     * Cancels group membership by deleting the associated ZooKeeper member 
node.
-     *
-     * @throws JoinException if there is a problem deleting the node
-     */
-    void cancel() throws JoinException;
-  }
-
-  /**
-   * Indicates an error joining a group.
-   */
-  public static class JoinException extends Exception {
-    public JoinException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error updating a group member's data.
-   */
-  public static class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, null)}.
-   */
-  public final Membership join() throws JoinException, InterruptedException {
-    return join(NO_MEMBER_DATA, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(memberData, null)}.
-   */
-  public final Membership join(Supplier<byte[]> memberData)
-      throws JoinException, InterruptedException {
-
-    return join(memberData, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, onLoseMembership)}.
-   */
-  public final Membership join(@Nullable final Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    return join(NO_MEMBER_DATA, onLoseMembership);
-  }
-
-  /**
-   * Joins this group and returns the resulting Membership when successful.  
Membership will be
-   * automatically cancelled when the current jvm process dies; however the 
returned Membership
-   * object can be used to cancel membership earlier.  Unless
-   * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called 
the membership will
-   * be maintained by re-establishing it silently in the background.
-   *
-   * <p>Any {@code memberData} given is persisted in the member node in 
ZooKeeper.  If an
-   * {@code onLoseMembership} callback is supplied, it will be notified each 
time this member loses
-   * membership in the group.
-   *
-   * @param memberData a supplier of the data to store in the member node
-   * @param onLoseMembership a callback to notify when membership is lost
-   * @return a Membership object with the member details
-   * @throws JoinException if there was a problem joining the group
-   * @throws InterruptedException if this thread is interrupted awaiting 
completion of the join
-   */
-  public final Membership join(Supplier<byte[]> memberData, @Nullable Command 
onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    Preconditions.checkNotNull(memberData);
-    ensurePersistentGroupPath();
-
-    final ActiveMembership groupJoiner = new ActiveMembership(memberData, 
onLoseMembership);
-    return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, 
JoinException>() {
-      @Override public Membership get() throws JoinException {
-        try {
-          return groupJoiner.join();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Interrupted trying to join group at path: " 
+ path, e);
-        } catch (ZooKeeperConnectionException e) {
-          LOG.log(Level.WARNING, "Temporary error trying to join group at 
path: " + path, e);
-          return null;
-        } catch (KeeperException e) {
-          if (zkClient.shouldRetry(e)) {
-            LOG.log(Level.WARNING, "Temporary error trying to join group at 
path: " + path, e);
-            return null;
-          } else {
-            throw new JoinException("Problem joining partition group at path: 
" + path, e);
-          }
-        }
-      }
-    });
-  }
-
-  private void ensurePersistentGroupPath() throws JoinException, 
InterruptedException {
-    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
JoinException>() {
-      @Override public Boolean get() throws JoinException {
-        try {
-          ZooKeeperUtils.ensurePath(zkClient, acl, path);
-          return true;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Interrupted trying to ensure group at path: 
" + path, e);
-        } catch (ZooKeeperConnectionException e) {
-          LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", 
e);
-          return false;
-        } catch (KeeperException e) {
-          if (zkClient.shouldRetry(e)) {
-            LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, 
e);
-            return false;
-          } else {
-            throw new JoinException("Problem ensuring group at path: " + path, 
e);
-          }
-        }
-      }
-    });
-  }
-
-  private class ActiveMembership implements Membership {
-    private final Supplier<byte[]> memberData;
-    private final Command onLoseMembership;
-    private String nodePath;
-    private String memberId;
-    private volatile boolean cancelled;
-    private byte[] membershipData;
-
-    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command 
onLoseMembership) {
-      this.memberData = memberData;
-      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : 
onLoseMembership;
-    }
-
-    @Override
-    public String getGroupPath() {
-      return path;
-    }
-
-    @Override
-    public synchronized String getMemberId() {
-      return memberId;
-    }
-
-    @Override
-    public synchronized String getMemberPath() {
-      return nodePath;
-    }
-
-    @Override
-    public synchronized byte[] updateMemberData() throws UpdateException {
-      byte[] membershipData = memberData.get();
-      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
-        try {
-          zkClient.get().setData(nodePath, membershipData, 
ZooKeeperUtils.ANY_VERSION);
-          this.membershipData = membershipData;
-        } catch (KeeperException e) {
-          throw new UpdateException("Problem updating membership data.", e);
-        } catch (InterruptedException e) {
-          throw new UpdateException("Interrupted attempting to update 
membership data.", e);
-        } catch (ZooKeeperConnectionException e) {
-          throw new UpdateException(
-              "Could not connect to the ZooKeeper cluster to update membership 
data.", e);
-        }
-      }
-      return membershipData;
-    }
-
-    @Override
-    public synchronized void cancel() throws JoinException {
-      if (!cancelled) {
-        try {
-          backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
JoinException>() {
-            @Override public Boolean get() throws JoinException {
-              try {
-                zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
-                return true;
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new JoinException("Interrupted trying to cancel 
membership: " + nodePath, e);
-              } catch (ZooKeeperConnectionException e) {
-                LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, 
retrying", e);
-                return false;
-              } catch (NoNodeException e) {
-                LOG.info("Membership already cancelled, node at path: " + 
nodePath +
-                         " has been deleted");
-                return true;
-              } catch (KeeperException e) {
-                if (zkClient.shouldRetry(e)) {
-                  LOG.log(Level.WARNING, "Temporary error cancelling 
membership: " + nodePath, e);
-                  return false;
-                } else {
-                  throw new JoinException("Problem cancelling membership: " + 
nodePath, e);
-                }
-              }
-            }
-          });
-          cancelled = true; // Prevent auto-re-join logic from undoing this 
cancel.
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Problem cancelling membership: " + 
nodePath, e);
-        }
-      }
-    }
-
-    private class CancelledException extends IllegalStateException { /* marker 
*/ }
-
-    synchronized Membership join()
-        throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
-
-      if (cancelled) {
-        throw new CancelledException();
-      }
-
-      if (nodePath == null) {
-        // Re-join if our ephemeral node goes away due to session expiry - 
only needs to be
-        // registered once.
-        zkClient.registerExpirationHandler(new Command() {
-          @Override public void execute() {
-            tryJoin();
-          }
-        });
-      }
-
-      byte[] membershipData = memberData.get();
-      String nodeName = nodeScheme.createName(membershipData);
-      CreateMode createMode = nodeScheme.isSequential()
-          ? CreateMode.EPHEMERAL_SEQUENTIAL
-          : CreateMode.EPHEMERAL;
-      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, 
acl, createMode);
-      memberId = Group.this.getMemberId(nodePath);
-      LOG.info("Set group member ID to " + memberId);
-      this.membershipData = membershipData;
-
-      // Re-join if our ephemeral node goes away due to maliciousness.
-      zkClient.get().exists(nodePath, new Watcher() {
-        @Override public void process(WatchedEvent event) {
-          if (event.getType() == EventType.NodeDeleted) {
-            tryJoin();
-          }
-        }
-      });
-
-      return this;
-    }
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
-        new ExceptionalSupplier<Boolean, InterruptedException>() {
-          @Override public Boolean get() throws InterruptedException {
-            try {
-              join();
-              return true;
-            } catch (CancelledException e) {
-              // Lost a cancel race - that's ok.
-              return true;
-            } catch (ZooKeeperConnectionException e) {
-              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, 
retrying", e);
-              return false;
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.log(Level.WARNING, "Temporary error re-joining group: " + 
path, e);
-                return false;
-              } else {
-                throw new IllegalStateException("Permanent problem re-joining 
group: " + path, e);
-              }
-            }
-          }
-        };
-
-    private synchronized void tryJoin() {
-      onLoseMembership.execute();
-      try {
-        backoffHelper.doUntilSuccess(tryJoin);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-join group: %s, 
giving up", path), e);
-      }
-    }
-  }
-
-  /**
-   * An interface to an object that listens for changes to a group's 
membership.
-   */
-  public interface GroupChangeListener {
-
-    /**
-     * Called whenever group membership changes with the new list of member 
ids.
-     *
-     * @param memberIds the current member ids
-     */
-    void onGroupChange(Iterable<String> memberIds);
-  }
-
-  /**
-   * An interface that dictates the scheme to use for storing and filtering 
nodes that represent
-   * members of a distributed group.
-   */
-  public interface NodeScheme {
-    /**
-     * Determines if a child node is a member of a group by examining the 
node's name.
-     *
-     * @param nodeName the name of a child node found in a group
-     * @return {@code true} if {@code nodeName} identifies a group member in 
this scheme
-     */
-    boolean isMember(String nodeName);
-
-    /**
-     * Generates a node name for the node representing this process in the 
distributed group.
-     *
-     * @param membershipData the data that will be stored in this node
-     * @return the name for the node that will represent this process in the 
group
-     */
-    String createName(byte[] membershipData);
-
-    /**
-     * Indicates whether this scheme needs ephemeral sequential nodes or just 
ephemeral nodes.
-     *
-     * @return {@code true} if this scheme requires sequential node names; 
{@code false} otherwise
-     */
-    boolean isSequential();
-  }
-
-  /**
-   * Indicates an error watching a group.
-   */
-  public static class WatchException extends Exception {
-    public WatchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Watches this group for the lifetime of this jvm process.  This method 
will block until the
-   * current group members are available, notify the {@code 
groupChangeListener} and then return.
-   * All further changes to the group membership will cause notifications on a 
background thread.
-   *
-   * @param groupChangeListener the listener to notify of group membership 
change events
-   * @return A command which, when executed, will stop watching the group.
-   * @throws WatchException if there is a problem generating the 1st group 
membership list
-   * @throws InterruptedException if interrupted waiting to gather the 1st 
group membership list
-   */
-  public final Command watch(final GroupChangeListener groupChangeListener)
-      throws WatchException, InterruptedException {
-    Preconditions.checkNotNull(groupChangeListener);
-
-    try {
-      ensurePersistentGroupPath();
-    } catch (JoinException e) {
-      throw new WatchException("Failed to create group path: " + path, e);
-    }
-
-    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
-    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, 
WatchException>() {
-      @Override public Boolean get() throws WatchException {
-        try {
-          groupMonitor.watchGroup();
-          return true;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new WatchException("Interrupted trying to watch group at path: 
" + path, e);
-        } catch (ZooKeeperConnectionException e) {
-          LOG.log(Level.WARNING, "Temporary error trying to watch group at 
path: " + path, e);
-          return null;
-        } catch (KeeperException e) {
-          if (zkClient.shouldRetry(e)) {
-            LOG.log(Level.WARNING, "Temporary error trying to watch group at 
path: " + path, e);
-            return null;
-          } else {
-            throw new WatchException("Problem trying to watch group at path: " 
+ path, e);
-          }
-        }
-      }
-    });
-    return new Command() {
-      @Override public void execute() {
-        groupMonitor.stopWatching();
-      }
-    };
-  }
-
-  /**
-   * Helps continuously monitor a group for membership changes.
-   */
-  private class GroupMonitor {
-    private final GroupChangeListener groupChangeListener;
-    private volatile boolean stopped = false;
-    private Set<String> members;
-
-    GroupMonitor(GroupChangeListener groupChangeListener) {
-      this.groupChangeListener = groupChangeListener;
-    }
-
-    private final Watcher groupWatcher = new Watcher() {
-      @Override public final void process(WatchedEvent event) {
-        if (event.getType() == EventType.NodeChildrenChanged) {
-          tryWatchGroup();
-        }
-      }
-    };
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> 
tryWatchGroup =
-        new ExceptionalSupplier<Boolean, InterruptedException>() {
-          @Override public Boolean get() throws InterruptedException {
-            try {
-              watchGroup();
-              return true;
-            } catch (ZooKeeperConnectionException e) {
-              LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, 
retrying", e);
-              return false;
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.log(Level.WARNING, "Temporary error re-watching group: " + 
path, e);
-                return false;
-              } else {
-                throw new IllegalStateException("Permanent problem re-watching 
group: " + path, e);
-              }
-            }
-          }
-        };
-
-    private void tryWatchGroup() {
-      if (stopped) {
-        return;
-      }
-
-      try {
-        backoffHelper.doUntilSuccess(tryWatchGroup);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-watch group: %s, 
giving up", path), e);
-      }
-    }
-
-    private void watchGroup()
-        throws ZooKeeperConnectionException, InterruptedException, 
KeeperException {
-
-      if (stopped) {
-        return;
-      }
-
-      List<String> children = zkClient.get().getChildren(path, groupWatcher);
-      setMembers(Iterables.filter(children, nodeNameFilter));
-    }
-
-    private void stopWatching() {
-      // TODO(William Farner): Cancel the watch when
-      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
-      LOG.info("Stopping watch on " + this);
-      stopped = true;
-    }
-
-    synchronized void setMembers(Iterable<String> members) {
-      if (stopped) {
-        LOG.info("Suppressing membership update, no longer watching " + this);
-        return;
-      }
-
-      if (this.members == null) {
-        // Reset our watch on the group if session expires - only needs to be 
registered once.
-        zkClient.registerExpirationHandler(new Command() {
-          @Override public void execute() {
-            tryWatchGroup();
-          }
-        });
-      }
-
-      Set<String> membership = ImmutableSet.copyOf(members);
-      if (!membership.equals(this.members)) {
-        groupChangeListener.onGroupChange(members);
-        this.members = membership;
-      }
-    }
-  }
-
-  /**
-   * Default naming scheme implementation. Stores nodes at [given path] + "/" 
+ [given prefix] +
-   * ZooKeeper-generated member ID. For example, if the path is 
"/discovery/servicename", and the
-   * prefix is "member_", the node's full path will look something like
-   * {@code /discovery/servicename/member_0000000007}.
-   */
-  public static class DefaultScheme implements NodeScheme {
-    private final String namePrefix;
-    private final Pattern namePattern;
-
-    /**
-     * Creates a sequential node scheme based on the given node name prefix.
-     *
-     * @param namePrefix the prefix for the names of the member nodes
-     */
-    public DefaultScheme(String namePrefix) {
-      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
-      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + 
"-?[0-9]+$");
-    }
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return namePattern.matcher(nodeName).matches();
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return namePrefix;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return true;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "Group " + path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java 
b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
deleted file mode 100644
index 4cdf5f2..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Ordering;
-import com.twitter.common.zookeeper.Group.GroupChangeListener;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.Group.Membership;
-import com.twitter.common.zookeeper.Group.UpdateException;
-import com.twitter.common.zookeeper.Group.WatchException;
-import org.apache.zookeeper.data.ACL;
-
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * A distributed mechanism for eventually arriving at an evenly partitioned 
space of long values.
- * A typical usage would have a client on each of several hosts joining a 
logical partition (a
- * "partition group") that represents some shared work.  Clients could then 
process a subset of a
- * full body of work by testing any given item of work with their partition 
filter.
- *
- * <p>Note that clients must be able to tolerate periods of duplicate 
processing by more than 1
- * partition as explained in {@link #join()}.
- *
- * @author John Sirois
- */
-public class Partitioner {
-
-  private static final Logger LOG = 
Logger.getLogger(Partitioner.class.getName());
-
-  private volatile int groupSize;
-  private volatile int groupIndex;
-  private final Group group;
-
-  /**
-   * Constructs a representation of a partition group but does not join it.  
Note that the partition
-   * group path will be created as a persistent zookeeper path if it does not 
already exist.
-   *
-   * @param zkClient a client to use for joining the partition group and 
watching its membership
-   * @param acl the acl for this partition group
-   * @param path a zookeeper path that represents the partition group
-   */
-  public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
-    group = new Group(zkClient, acl, path);
-  }
-
-  @VisibleForTesting
-  int getGroupSize() {
-    return groupSize;
-  }
-
-  /**
-   * Represents a slice of a partition group.  The partition is dynamic and 
will adjust its size as
-   * members join and leave its partition group.
-   */
-  public abstract static class Partition implements Predicate<Long>, 
Membership {
-
-    /**
-     * Returns {@code true} if the given {@code value} is a member of this 
partition at this time.
-     */
-    public abstract boolean isMember(long value);
-
-    /**
-     * Gets number of members in the group at this time.
-     *
-     * @return number of members in the ZK group at this time.
-     */
-    public abstract int getNumPartitions();
-
-    /**
-     * Evaluates partition membership based on the given {@code value}'s hash 
code.  If the value
-     * is null it is never a member of a partition.
-     */
-    boolean isMember(Object value) {
-      return (value != null) && isMember(value.hashCode());
-    }
-
-    /**
-     * Equivalent to {@link #isMember(long)} for all non-null values; however 
incurs unboxing
-     * overhead.
-     */
-    @Override
-    public boolean apply(@Nullable Long input) {
-      return (input != null) && isMember(input);
-    }
-  }
-
-  /**
-   * Attempts to join the partition group and claim a slice.  When successful, 
a predicate is
-   * returned that can be used to test whether or not an item belongs to this 
partition.  The
-   * predicate is dynamic such that as the group is further partitioned or 
partitions merge the
-   * predicate will claim a narrower or wider swath of the partition space 
respectively.  Partition
-   * creation and merging is not instantaneous and clients should expect 
independent partitions to
-   * claim ownership of some items when partition membership is in flux.  It 
is only in the steady
-   * state that a client should expect independent partitions to divide the 
partition space evenly
-   * and without overlap.
-   *
-   * <p>TODO(John Sirois): consider adding a version with a global timeout for 
the join operation.
-   *
-   * @return the partition representing the slice of the partition group this 
member can claim
-   * @throws JoinException if there was a problem joining the partition group
-   * @throws InterruptedException if interrupted while waiting to join the 
partition group
-   */
-  public final Partition join() throws JoinException, InterruptedException {
-    final Membership membership = group.join();
-    try {
-      group.watch(createGroupChangeListener(membership));
-    } catch (WatchException e) {
-      membership.cancel();
-      throw new JoinException("Problem establishing watch on group after 
joining it", e);
-    }
-    return new Partition() {
-      @Override public boolean isMember(long value) {
-        return (value % groupSize) == groupIndex;
-      }
-
-      @Override public int getNumPartitions() {
-        return groupSize;
-      }
-
-      @Override public String getGroupPath() {
-        return membership.getGroupPath();
-      }
-
-      @Override public String getMemberId() {
-        return membership.getMemberId();
-      }
-
-      @Override public String getMemberPath() {
-        return membership.getMemberPath();
-      }
-
-      @Override public byte[] updateMemberData() throws UpdateException {
-        return membership.updateMemberData();
-      }
-
-      @Override public void cancel() throws JoinException {
-        membership.cancel();
-      }
-    };
-  }
-
-  @VisibleForTesting GroupChangeListener createGroupChangeListener(final 
Membership membership) {
-    return new GroupChangeListener() {
-      @Override public void onGroupChange(Iterable<String> memberIds) {
-        List<String> members = Ordering.natural().sortedCopy(memberIds);
-        int newSize = members.size();
-        int newIndex = members.indexOf(membership.getMemberId());
-
-        LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
-            membership.getGroupPath(), members, groupSize, groupIndex, 
newSize, newIndex));
-
-        groupSize = newSize;
-        groupIndex = newIndex;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java 
b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
deleted file mode 100644
index 2021f51..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-/**
- * A logical set of servers registered in ZooKeeper.  Intended to be used by 
both servers in a
- * common service and their clients.
- *
- * TODO(William Farner): Explore decoupling this from thrift.
- */
-public interface ServerSet extends DynamicHostSet<ServiceInstance> {
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
-   * @param status the current service status
-   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the 
server set
-   * @deprecated The status field is deprecated. Please use {@link 
#join(InetSocketAddress, Map)}
-   */
-  @Deprecated
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      Status status) throws JoinException, InterruptedException;
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
-   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the 
server set
-   */
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws JoinException, InterruptedException;
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their 
logical name
-   * @param shardId Unique shard identifier for this member of the service.
-   * @return an EndpointStatus object that allows the endpoint to adjust its 
status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the 
server set
-   */
-  EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      int shardId) throws JoinException, InterruptedException;
-
-  /**
-   * A handle to a service endpoint's status data that allows updating it to 
track current events.
-   */
-  public interface EndpointStatus {
-
-    /**
-     * Attempts to update the status of the service endpoint associated with 
this endpoint.  If the
-     * {@code status} is {@link Status#DEAD} then the endpoint will be removed 
from the server set.
-     *
-     * @param status the current status of the endpoint
-     * @throws UpdateException if there was a problem writing the update
-     * @deprecated Support for mutable status is deprecated. Please use {@link 
#leave()}
-     */
-    @Deprecated
-    void update(Status status) throws UpdateException;
-
-    /**
-     * Removes the endpoint from the server set.
-     *
-     * @throws UpdateException if there was a problem leaving the ServerSet.
-     */
-    void leave() throws UpdateException;
-  }
-
-  /**
-   * Indicates an error updating a service's status information.
-   */
-  public static class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-
-    public UpdateException(String message) {
-      super(message);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java 
b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
deleted file mode 100644
index b00015e..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import com.google.gson.Gson;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.args.Arg;
-import com.twitter.common.args.CmdLine;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Function;
-import com.twitter.common.base.Supplier;
-import com.twitter.common.io.Codec;
-import com.twitter.common.io.CompatibilityCodec;
-import com.twitter.common.io.ThriftCodec;
-import com.twitter.common.util.BackoffHelper;
-import com.twitter.common.zookeeper.Group.GroupChangeListener;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.Group.Membership;
-import com.twitter.common.zookeeper.Group.WatchException;
-import 
com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * ZooKeeper-backed implementation of {@link ServerSet}.
- */
-public class ServerSetImpl implements ServerSet {
-  private static final Logger LOG = 
Logger.getLogger(ServerSetImpl.class.getName());
-
-  @CmdLine(name = "serverset_encode_json",
-           help = "If true, use JSON for encoding server set information."
-               + " Defaults to true (use JSON).")
-  private static final Arg<Boolean> ENCODE_JSON = Arg.create(true);
-
-  private final ZooKeeperClient zkClient;
-  private final Group group;
-  private final Codec<ServiceInstance> codec;
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a new ServerSet using open ZooKeeper node ACLs.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
-    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
-  }
-
-  /**
-   * Creates a new ServerSet for the given service {@code path}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it 
does not already exist
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String 
path) {
-    this(zkClient, new Group(zkClient, acl, path), createDefaultCodec());
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
-    this(zkClient, group, createDefaultCodec());
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group} and a 
custom {@code codec}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   * @param codec a codec to use for serializing and de-serializing the 
ServiceInstance data to and
-   *     from a byte array
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group, 
Codec<ServiceInstance> codec) {
-    this.zkClient = checkNotNull(zkClient);
-    this.group = checkNotNull(group);
-    this.codec = checkNotNull(codec);
-
-    // TODO(John Sirois): Inject the helper so that backoff strategy can be 
configurable.
-    backoffHelper = new BackoffHelper();
-  }
-
-  @VisibleForTesting
-  ZooKeeperClient getZkClient() {
-    return zkClient;
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws JoinException, InterruptedException {
-
-    LOG.log(Level.WARNING,
-        "Joining a ServerSet without a shard ID is deprecated and will soon 
break.");
-    return join(endpoint, additionalEndpoints, Optional.<Integer>absent());
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      int shardId) throws JoinException, InterruptedException {
-
-    return join(endpoint, additionalEndpoints, Optional.of(shardId));
-  }
-
-  private EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      Optional<Integer> shardId) throws JoinException, InterruptedException {
-
-    checkNotNull(endpoint);
-    checkNotNull(additionalEndpoints);
-
-    final MemberStatus memberStatus =
-        new MemberStatus(endpoint, additionalEndpoints, shardId);
-    Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() {
-      @Override public byte[] get() {
-        return memberStatus.serializeServiceInstance();
-      }
-    };
-    final Membership membership = group.join(serviceInstanceSupplier);
-
-    return new EndpointStatus() {
-      @Override public void update(Status status) throws UpdateException {
-        checkNotNull(status);
-        LOG.warning("This method is deprecated. Please use leave() instead.");
-        if (status == Status.DEAD) {
-          leave();
-        } else {
-          LOG.warning("Status update has been ignored");
-        }
-      }
-
-      @Override public void leave() throws UpdateException {
-        memberStatus.leave(membership);
-      }
-    };
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      Status status) throws JoinException, InterruptedException {
-
-    LOG.warning("This method is deprecated. Please do not specify a status 
field.");
-    if (status != Status.ALIVE) {
-      
LOG.severe("**************************************************************************\n"
-          + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n"
-          + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status
-          + 
"\n**************************************************************************");
-    }
-    return join(endpoint, additionalEndpoints);
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
-    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, 
monitor);
-    try {
-      return serverSetWatcher.watch();
-    } catch (WatchException e) {
-      throw new MonitorException("ZooKeeper watch failed.", e);
-    } catch (InterruptedException e) {
-      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
-    }
-  }
-
-  @Override
-  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
-    LOG.warning("This method is deprecated. Please use watch instead.");
-    watch(monitor);
-  }
-
-  private class MemberStatus {
-    private final InetSocketAddress endpoint;
-    private final Map<String, InetSocketAddress> additionalEndpoints;
-    private final Optional<Integer> shardId;
-
-    private MemberStatus(
-        InetSocketAddress endpoint,
-        Map<String, InetSocketAddress> additionalEndpoints,
-        Optional<Integer> shardId) {
-
-      this.endpoint = endpoint;
-      this.additionalEndpoints = additionalEndpoints;
-      this.shardId = shardId;
-    }
-
-    synchronized void leave(Membership membership) throws UpdateException {
-      try {
-        membership.cancel();
-      } catch (JoinException e) {
-        throw new UpdateException(
-            "Failed to auto-cancel group membership on transition to DEAD 
status", e);
-      }
-    }
-
-    byte[] serializeServiceInstance() {
-      ServiceInstance serviceInstance = new ServiceInstance(
-          ServerSets.toEndpoint(endpoint),
-          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
-          Status.ALIVE);
-
-      if (shardId.isPresent()) {
-        serviceInstance.setShard(shardId.get());
-      }
-
-      LOG.fine("updating endpoint data to:\n\t" + serviceInstance);
-      try {
-        return ServerSets.serializeServiceInstance(serviceInstance, codec);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unexpected problem serializing thrift 
struct " +
-            serviceInstance + "to a byte[]", e);
-      }
-    }
-  }
-
-  private static class ServiceInstanceFetchException extends RuntimeException {
-    ServiceInstanceFetchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private static class ServiceInstanceDeletedException extends 
RuntimeException {
-    ServiceInstanceDeletedException(String path) {
-      super(path);
-    }
-  }
-
-  private class ServerSetWatcher {
-    private final ZooKeeperClient zkClient;
-    private final HostChangeMonitor<ServiceInstance> monitor;
-    @Nullable private ImmutableSet<ServiceInstance> serverSet;
-
-    ServerSetWatcher(ZooKeeperClient zkClient, 
HostChangeMonitor<ServiceInstance> monitor) {
-      this.zkClient = zkClient;
-      this.monitor = monitor;
-    }
-
-    public Command watch() throws WatchException, InterruptedException {
-      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new 
Command() {
-        @Override public void execute() {
-          // Servers may have changed Status while we were disconnected from 
ZooKeeper, check and
-          // re-register our node watches.
-          rebuildServerSet();
-        }
-      });
-
-      try {
-        return group.watch(new GroupChangeListener() {
-          @Override public void onGroupChange(Iterable<String> memberIds) {
-            notifyGroupChange(memberIds);
-          }
-        });
-      } catch (WatchException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      } catch (InterruptedException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      }
-    }
-
-    private ServiceInstance getServiceInstance(final String nodePath) {
-      try {
-        return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() {
-          @Override public ServiceInstance get() {
-            try {
-              byte[] data = zkClient.get().getData(nodePath, false, null);
-              return ServerSets.deserializeServiceInstance(data, codec);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new ServiceInstanceFetchException(
-                  "Interrupted updating service data for: " + nodePath, e);
-            } catch (ZooKeeperConnectionException e) {
-              LOG.log(Level.WARNING,
-                  "Temporary error trying to updating service data for: " + 
nodePath, e);
-              return null;
-            } catch (NoNodeException e) {
-              invalidateNodePath(nodePath);
-              throw new ServiceInstanceDeletedException(nodePath);
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.log(Level.WARNING,
-                    "Temporary error trying to update service data for: " + 
nodePath, e);
-                return null;
-              } else {
-                throw new ServiceInstanceFetchException(
-                    "Failed to update service data for: " + nodePath, e);
-              }
-            } catch (IOException e) {
-              throw new ServiceInstanceFetchException(
-                  "Failed to deserialize the ServiceInstance data for: " + 
nodePath, e);
-            }
-          }
-        });
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceInstanceFetchException(
-            "Interrupted trying to update service data for: " + nodePath, e);
-      }
-    }
-
-    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
-        CacheBuilder.newBuilder().build(new CacheLoader<String, 
ServiceInstance>() {
-          @Override public ServiceInstance load(String memberId) {
-            return getServiceInstance(group.getMemberPath(memberId));
-          }
-        });
-
-    private void rebuildServerSet() {
-      Set<String> memberIds = 
ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
-      servicesByMemberId.invalidateAll();
-      notifyGroupChange(memberIds);
-    }
-
-    private String invalidateNodePath(String deletedPath) {
-      String memberId = group.getMemberId(deletedPath);
-      servicesByMemberId.invalidate(memberId);
-      return memberId;
-    }
-
-    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
-        new Function<String, ServiceInstance>() {
-          @Override public ServiceInstance apply(String memberId) {
-            // This get will trigger a fetch
-            try {
-              return servicesByMemberId.getUnchecked(memberId);
-            } catch (UncheckedExecutionException e) {
-              Throwable cause = e.getCause();
-              if (!(cause instanceof ServiceInstanceDeletedException)) {
-                Throwables.propagateIfInstanceOf(cause, 
ServiceInstanceFetchException.class);
-                throw new IllegalStateException(
-                    "Unexpected error fetching member data for: " + memberId, 
e);
-              }
-              return null;
-            }
-          }
-        };
-
-    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
-      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
-      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
-
-      // Ignore no-op state changes except for the 1st when we've seen no 
group yet.
-      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
-        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, 
newMemberIds);
-        // Implicit removal from servicesByMemberId.
-        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
-
-        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
-            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), 
Predicates.notNull());
-
-        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
-      }
-    }
-
-    private void notifyServerSetChange(ImmutableSet<ServiceInstance> 
currentServerSet) {
-      // ZK nodes may have changed if there was a session expiry for a server 
in the server set, but
-      // if the server's status has not changed, we can skip any onChange 
updates.
-      if (!currentServerSet.equals(serverSet)) {
-        if (currentServerSet.isEmpty()) {
-          LOG.warning("server set empty for path " + group.getPath());
-        } else {
-          if (LOG.isLoggable(Level.INFO)) {
-            if (serverSet == null) {
-              LOG.info("received initial membership " + currentServerSet);
-            } else {
-              logChange(Level.INFO, currentServerSet);
-            }
-          }
-        }
-        serverSet = currentServerSet;
-        monitor.onChange(serverSet);
-      }
-    }
-
-    private void logChange(Level level, ImmutableSet<ServiceInstance> 
newServerSet) {
-      StringBuilder message = new StringBuilder("server set " + 
group.getPath() + " change: ");
-      if (serverSet.size() != newServerSet.size()) {
-        message.append("from ").append(serverSet.size())
-            .append(" members to ").append(newServerSet.size());
-      }
-
-      Joiner joiner = Joiner.on("\n\t\t");
-
-      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
-      if (!left.isEmpty()) {
-        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
-      }
-
-      SetView<ServiceInstance> joined = Sets.difference(newServerSet, 
serverSet);
-      if (!joined.isEmpty()) {
-        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
-      }
-
-      LOG.log(level, message.toString());
-    }
-  }
-
-  private static class EndpointSchema {
-    final String host;
-    final Integer port;
-
-    EndpointSchema(Endpoint endpoint) {
-      Preconditions.checkNotNull(endpoint);
-      this.host = endpoint.getHost();
-      this.port = endpoint.getPort();
-    }
-
-    String getHost() {
-      return host;
-    }
-
-    Integer getPort() {
-      return port;
-    }
-  }
-
-  private static class ServiceInstanceSchema {
-    final EndpointSchema serviceEndpoint;
-    final Map<String, EndpointSchema> additionalEndpoints;
-    final Status status;
-    final Integer shard;
-
-    ServiceInstanceSchema(ServiceInstance instance) {
-      this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint());
-      if (instance.getAdditionalEndpoints() != null) {
-        this.additionalEndpoints = Maps.transformValues(
-            instance.getAdditionalEndpoints(),
-            new Function<Endpoint, EndpointSchema>() {
-              @Override public EndpointSchema apply(Endpoint endpoint) {
-                return new EndpointSchema(endpoint);
-              }
-            }
-        );
-      } else {
-        this.additionalEndpoints = Maps.newHashMap();
-      }
-      this.status  = instance.getStatus();
-      this.shard = instance.isSetShard() ? instance.getShard() : null;
-    }
-
-    EndpointSchema getServiceEndpoint() {
-      return serviceEndpoint;
-    }
-
-    Map<String, EndpointSchema> getAdditionalEndpoints() {
-      return additionalEndpoints;
-    }
-
-    Status getStatus() {
-      return status;
-    }
-
-    Integer getShard() {
-      return shard;
-    }
-  }
-
-  /**
-   * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to 
circumvent the
-   * __isset_bit_vector internal thrift struct field that tracks primitive 
types.
-   */
-  private static class AdaptedJsonCodec implements Codec<ServiceInstance> {
-    private static final Charset ENCODING = Charsets.UTF_8;
-    private static final Class<ServiceInstanceSchema> CLASS = 
ServiceInstanceSchema.class;
-    private final Gson gson = new Gson();
-
-    @Override
-    public void serialize(ServiceInstance instance, OutputStream sink) throws 
IOException {
-      Writer w = new OutputStreamWriter(sink, ENCODING);
-      gson.toJson(new ServiceInstanceSchema(instance), CLASS, w);
-      w.flush();
-    }
-
-    @Override
-    public ServiceInstance deserialize(InputStream source) throws IOException {
-      ServiceInstanceSchema output = gson.fromJson(new 
InputStreamReader(source, ENCODING), CLASS);
-      Endpoint primary = new Endpoint(
-          output.getServiceEndpoint().getHost(), 
output.getServiceEndpoint().getPort());
-      Map<String, Endpoint> additional = Maps.transformValues(
-          output.getAdditionalEndpoints(),
-          new Function<EndpointSchema, Endpoint>() {
-            @Override public Endpoint apply(EndpointSchema endpoint) {
-              return new Endpoint(endpoint.getHost(), endpoint.getPort());
-            }
-          }
-      );
-      ServiceInstance instance =
-          new ServiceInstance(primary, ImmutableMap.copyOf(additional), 
output.getStatus());
-      if (output.getShard() != null) {
-        instance.setShard(output.getShard());
-      }
-      return instance;
-    }
-  }
-
-  private static Codec<ServiceInstance> createCodec(final boolean 
useJsonEncoding) {
-    final Codec<ServiceInstance> json = new AdaptedJsonCodec();
-    final Codec<ServiceInstance> thrift =
-        ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL);
-    final Predicate<byte[]> recognizer = new Predicate<byte[]>() {
-      public boolean apply(byte[] input) {
-        return (input.length > 1 && input[0] == '{' && input[1] == '\"') == 
useJsonEncoding;
-      }
-    };
-
-    if (useJsonEncoding) {
-      return CompatibilityCodec.create(json, thrift, 2, recognizer);
-    }
-    return CompatibilityCodec.create(thrift, json, 2, recognizer);
-  }
-
-  /**
-   * Creates a codec for {@link ServiceInstance} objects that uses Thrift 
binary encoding, and can
-   * decode both Thrift and JSON encodings.
-   *
-   * @return a new codec instance.
-   */
-  public static Codec<ServiceInstance> createThriftCodec() {
-    return createCodec(false);
-  }
-
-  /**
-   * Creates a codec for {@link ServiceInstance} objects that uses JSON 
encoding, and can decode
-   * both Thrift and JSON encodings.
-   *
-   * @return a new codec instance.
-   */
-  public static Codec<ServiceInstance> createJsonCodec() {
-    return createCodec(true);
-  }
-
-  /**
-   * Returns a codec for {@link ServiceInstance} objects that uses either the 
Thrift or the JSON
-   * encoding, depending on whether the command line argument 
<tt>serverset_json_encofing</tt> is
-   * set to <tt>true</tt>, and can decode both Thrift and JSON encodings.
-   *
-   * @return a new codec instance.
-   */
-  public static Codec<ServiceInstance> createDefaultCodec() {
-    return createCodec(ENCODE_JSON.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java 
b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
deleted file mode 100644
index fc7ea88..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.base.Function;
-import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.io.Codec;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-/**
- * Common ServerSet related functions
- */
-public class ServerSets {
-
-  private ServerSets() {
-    // Utility class.
-  }
-
-  /**
-   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
-   */
-  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
-      new Function<InetSocketAddress, Endpoint>() {
-        @Override public Endpoint apply(InetSocketAddress address) {
-          return ServerSets.toEndpoint(address);
-        }
-      };
-
-  /**
-   * Creates a server set that registers at a single path applying the given 
ACL to all nodes
-   * created in the path.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet 
creates.
-   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, 
java.util.Set)
-   * @return A server set that registers at {@code zkPath}.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, 
String zkPath) {
-    return create(zkClient, acl, ImmutableSet.of(zkPath));
-  }
-
-  /**
-   * Creates a server set that registers at one or multiple paths applying the 
given ACL to all
-   * nodes created in the paths.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet 
creates.
-   * @param zkPaths Paths to register at, must be non-empty.
-   * @return A server set that registers at the given {@code zkPath}s.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, 
Set<String> zkPaths) {
-    Preconditions.checkNotNull(zkClient);
-    MorePreconditions.checkNotBlank(acl);
-    MorePreconditions.checkNotBlank(zkPaths);
-
-    if (zkPaths.size() == 1) {
-      return new ServerSetImpl(zkClient, acl, 
Iterables.getOnlyElement(zkPaths));
-    } else {
-      ImmutableList.Builder<ServerSet> builder = ImmutableList.builder();
-      for (String path : zkPaths) {
-        builder.add(new ServerSetImpl(zkClient, acl, path));
-      }
-      return new CompoundServerSet(builder.build());
-    }
-  }
-
-  /**
-   * Returns a serialized Thrift service instance object, with given endpoints 
and codec.
-   *
-   * @param serviceInstance the Thrift service instance object to be serialized
-   * @param codec the codec to use to serialize a Thrift service instance 
object
-   * @return byte array that contains a serialized Thrift service instance
-   */
-  public static byte[] serializeServiceInstance(
-      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws 
IOException {
-
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    codec.serialize(serviceInstance, output);
-    return output.toByteArray();
-  }
-
-  /**
-   * Serializes a service instance based on endpoints.
-   * @see #serializeServiceInstance(ServiceInstance, Codec)
-   *
-   * @param address the target address of the service instance
-   * @param additionalEndpoints additional endpoints of the service instance
-   * @param status service status
-   */
-  public static byte[] serializeServiceInstance(
-      InetSocketAddress address,
-      Map<String, Endpoint> additionalEndpoints,
-      Status status,
-      Codec<ServiceInstance> codec) throws IOException {
-
-    ServiceInstance serviceInstance =
-        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
-    return serializeServiceInstance(serviceInstance, codec);
-  }
-
-  /**
-   * Creates a service instance object deserialized from byte array.
-   *
-   * @param data the byte array contains a serialized Thrift service instance
-   * @param codec the codec to use to deserialize the byte array
-   */
-  public static ServiceInstance deserializeServiceInstance(
-      byte[] data, Codec<ServiceInstance> codec) throws IOException {
-
-    return codec.deserialize(new ByteArrayInputStream(data));
-  }
-
-  /**
-   * Creates an endpoint for the given InetSocketAddress.
-   *
-   * @param address the target address to create the endpoint for
-   */
-  public static Endpoint toEndpoint(InetSocketAddress address) {
-    return new Endpoint(address.getHostName(), address.getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java 
b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
deleted file mode 100644
index 91e28f2..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-
-import com.twitter.common.base.ExceptionalCommand;
-import com.twitter.common.zookeeper.Candidate.Leader;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
-import com.twitter.common.zookeeper.ServerSet.UpdateException;
-import com.twitter.thrift.Status;
-
-/**
- * A service that uses master election to only allow a single instance of the 
server to join
- * the {@link ServerSet} at a time.
- */
-public class SingletonService {
-  private static final Logger LOG = 
Logger.getLogger(SingletonService.class.getName());
-
-  @VisibleForTesting
-  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
-
-  /**
-   * Creates a candidate that can be combined with an existing server set to 
form a singleton
-   * service using {@link #SingletonService(ServerSet, Candidate)}.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and 
serverset nodes.
-   * @return A candidate that can be housed with a standard server set under a 
single zk path.
-   */
-  public static Candidate createSingletonCandidate(
-      ZooKeeperClient zkClient,
-      String servicePath,
-      Iterable<ACL> acl) {
-
-    return new CandidateImpl(new Group(zkClient, acl, servicePath, 
LEADER_ELECT_NODE_PREFIX));
-  }
-
-  private final ServerSet serverSet;
-  private final Candidate candidate;
-
-  /**
-   * Equivalent to {@link #SingletonService(ZooKeeperClient, String, 
Iterable)} with a default
-   * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
-   */
-  public SingletonService(ZooKeeperClient zkClient, String servicePath) {
-    this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-  }
-
-  /**
-   * Creates a new singleton service, identified by {@code servicePath}.  All 
nodes related to the
-   * service (for both leader election and service registration) will live 
under the path and each
-   * node will be created with the supplied {@code acl}. Internally, two 
ZooKeeper {@code Group}s
-   * are used to manage a singleton service - one for leader election, and 
another for the
-   * {@code ServerSet} where the leader's endpoints are registered.  
Leadership election should
-   * guarantee that at most one instance will ever exist in the ServerSet at 
once.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and 
serverset nodes.
-   */
-  public SingletonService(ZooKeeperClient zkClient, String servicePath, 
Iterable<ACL> acl) {
-    this(
-        new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)),
-        createSingletonCandidate(zkClient, servicePath, acl));
-  }
-
-  /**
-   * Creates a new singleton service that uses the supplied candidate to vie 
for leadership and then
-   * advertises itself in the given server set once elected.
-   *
-   * @param serverSet The server set to advertise in on election.
-   * @param candidate The candidacy to use to vie for election.
-   */
-  public SingletonService(ServerSet serverSet, Candidate candidate) {
-    this.serverSet = Preconditions.checkNotNull(serverSet);
-    this.candidate = Preconditions.checkNotNull(candidate);
-  }
-
-  /**
-   * Attempts to lead the singleton service.
-   *
-   * @param endpoint The primary endpoint to register as a leader candidate in 
the service.
-   * @param additionalEndpoints Additional endpoints that are available on the 
host.
-   * @param status deprecated, will be ignored entirely
-   * @param listener Handler to call when the candidate is elected or defeated.
-   * @throws Group.WatchException If there was a problem watching the 
ZooKeeper group.
-   * @throws Group.JoinException If there was a problem joining the ZooKeeper 
group.
-   * @throws InterruptedException If the thread watching/joining the group was 
interrupted.
-   * @deprecated The status field is deprecated. Please use
-   *            {@link #lead(InetSocketAddress, Map, LeadershipListener)}
-   */
-  @Deprecated
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final Status status,
-                   final LeadershipListener listener)
-                   throws Group.WatchException, Group.JoinException, 
InterruptedException {
-
-    if (status != Status.ALIVE) {
-      
LOG.severe("******************************************************************************");
-      LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
-      LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + 
status);
-      
LOG.severe("******************************************************************************");
-    } else {
-      
LOG.warning("******************************************************************************");
-      LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.");
-      LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, 
LeadershipListener)");
-      
LOG.warning("******************************************************************************");
-    }
-
-    lead(endpoint, additionalEndpoints, listener);
-  }
-
-  /**
-   * Attempts to lead the singleton service.
-   *
-   * @param endpoint The primary endpoint to register as a leader candidate in 
the service.
-   * @param additionalEndpoints Additional endpoints that are available on the 
host.
-   * @param listener Handler to call when the candidate is elected or defeated.
-   * @throws Group.WatchException If there was a problem watching the 
ZooKeeper group.
-   * @throws Group.JoinException If there was a problem joining the ZooKeeper 
group.
-   * @throws InterruptedException If the thread watching/joining the group was 
interrupted.
-   */
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final LeadershipListener listener)
-                   throws Group.WatchException, Group.JoinException, 
InterruptedException {
-
-    Preconditions.checkNotNull(listener);
-
-    candidate.offerLeadership(new Leader() {
-      private EndpointStatus endpointStatus = null;
-      @Override public void onElected(final ExceptionalCommand<JoinException> 
abdicate) {
-        listener.onLeading(new LeaderControl() {
-          EndpointStatus endpointStatus = null;
-          final AtomicBoolean left = new AtomicBoolean(false);
-
-          // Methods are synchronized to prevent simultaneous invocations.
-          @Override public synchronized void advertise()
-              throws JoinException, InterruptedException {
-
-            Preconditions.checkState(!left.get(), "Cannot advertise after 
leaving.");
-            Preconditions.checkState(endpointStatus == null, "Cannot advertise 
more than once.");
-            endpointStatus = serverSet.join(endpoint, additionalEndpoints);
-          }
-
-          @Override public synchronized void leave() throws UpdateException, 
JoinException {
-            Preconditions.checkState(left.compareAndSet(false, true),
-                "Cannot leave more than once.");
-            if (endpointStatus != null) {
-              endpointStatus.leave();
-            }
-            abdicate.execute();
-          }
-        });
-      }
-
-      @Override public void onDefeated() {
-        listener.onDefeated(endpointStatus);
-      }
-    });
-  }
-
-  /**
-   * A listener to be notified of changes in the leadership status.
-   * Implementers should be careful to avoid blocking operations in these 
callbacks.
-   */
-  public interface LeadershipListener {
-
-    /**
-     * Notifies the listener that is is current leader.
-     *
-     * @param control A controller handle to advertise and/or leave advertised 
presence.
-     */
-    public void onLeading(LeaderControl control);
-
-    /**
-     * Notifies the listener that it is no longer leader.  The leader should 
take this opportunity
-     * to remove its advertisement gracefully.
-     *
-     * @param status A handle on the endpoint status for the advertised leader.
-     */
-    public void onDefeated(@Nullable EndpointStatus status);
-  }
-
-  /**
-   * A leadership listener that decorates another listener by automatically 
defeating a
-   * leader that has dropped its connection to ZooKeeper.
-   * Note that the decision to use this over session-based mutual exclusion 
should not be taken
-   * lightly.  Any momentary connection loss due to a flaky network or a 
ZooKeeper server process
-   * exit will cause a leader to abort.
-   */
-  public static class DefeatOnDisconnectLeader implements LeadershipListener {
-
-    private final LeadershipListener wrapped;
-    private Optional<LeaderControl> maybeControl = Optional.absent();
-
-    /**
-     * Creates a new leadership listener that will delegate calls to the 
wrapped listener, and
-     * invoke {@link #onDefeated(EndpointStatus)} if a ZooKeeper disconnect is 
observed while
-     * leading.
-     *
-     * @param zkClient The ZooKeeper client to watch for disconnect events.
-     * @param wrapped The leadership listener to wrap.
-     */
-    public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, 
LeadershipListener wrapped) {
-      this.wrapped = Preconditions.checkNotNull(wrapped);
-
-      zkClient.register(new Watcher() {
-        @Override public void process(WatchedEvent event) {
-          if ((event.getType() == EventType.None)
-              && (event.getState() == KeeperState.Disconnected)) {
-            disconnected();
-          }
-        }
-      });
-    }
-
-    private synchronized void disconnected() {
-      if (maybeControl.isPresent()) {
-        LOG.warning("Disconnected from ZooKeeper while leading, committing 
suicide.");
-        try {
-          wrapped.onDefeated(null);
-          maybeControl.get().leave();
-        } catch (UpdateException e) {
-          LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
-        } catch (JoinException e) {
-          LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e);
-        } finally {
-          setControl(null);
-        }
-      } else {
-        LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not 
the leader.");
-      }
-    }
-
-    private synchronized void setControl(@Nullable LeaderControl control) {
-      this.maybeControl = Optional.fromNullable(control);
-    }
-
-    @Override public void onLeading(final LeaderControl control) {
-      setControl(control);
-      wrapped.onLeading(new LeaderControl() {
-        @Override public void advertise() throws JoinException, 
InterruptedException {
-          control.advertise();
-        }
-
-        @Override public void leave() throws UpdateException, JoinException {
-          setControl(null);
-          control.leave();
-        }
-      });
-    }
-
-    @Override public void onDefeated(@Nullable EndpointStatus status) {
-      setControl(null);
-      wrapped.onDefeated(status);
-    }
-  }
-
-  /**
-   * A controller for the state of the leader.  This will be provided to the 
leader upon election,
-   * which allows the leader to decide when to advertise in the underlying 
{@link ServerSet} and
-   * terminate leadership at will.
-   */
-  public interface LeaderControl {
-
-    /**
-     * Advertises the leader's server presence to clients.
-     *
-     * @throws JoinException If there was an error advertising.
-     * @throws InterruptedException If interrupted while advertising.
-     */
-    void advertise() throws JoinException, InterruptedException;
-
-    /**
-     * Leaves candidacy for leadership, removing advertised server presence if 
applicable.
-     *
-     * @throws UpdateException If the leader's status could not be updated.
-     * @throws JoinException If there was an error abdicating from leader 
election.
-     */
-    void leave() throws UpdateException, JoinException;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
----------------------------------------------------------------------
diff --git 
a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java 
b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
deleted file mode 100644
index 56adcfe..0000000
--- a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.zookeeper.Group.JoinException;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-
-/**
- * A server set that represents a fixed set of hosts.
- * This may be composed under {@link CompoundServerSet} to ensure a minimum 
set of hosts is
- * present.
- * A static server set does not support joining, but will allow normal join 
calls and status update
- * calls to be made.
- */
-public class StaticServerSet implements ServerSet {
-
-  private static final Logger LOG = 
Logger.getLogger(StaticServerSet.class.getName());
-
-  private static final Function<Endpoint, ServiceInstance> 
ENDPOINT_TO_INSTANCE =
-      new Function<Endpoint, ServiceInstance>() {
-        @Override public ServiceInstance apply(Endpoint endpoint) {
-          return new ServiceInstance(endpoint, ImmutableMap.<String, 
Endpoint>of(), Status.ALIVE);
-        }
-      };
-
-  private final ImmutableSet<ServiceInstance> hosts;
-
-  /**
-   * Creates a static server set that will reply to monitor calls immediately 
and exactly once with
-   * the provided service instances.
-   *
-   * @param hosts Hosts in the static set.
-   */
-  public StaticServerSet(Set<ServiceInstance> hosts) {
-    this.hosts = ImmutableSet.copyOf(hosts);
-  }
-
-  /**
-   * Creates a static server set containing the provided endpoints (and no 
auxiliary ports) which
-   * will all be in the {@link Status#ALIVE} state.
-   *
-   * @param endpoints Endpoints in the static set.
-   * @return A static server set that will advertise the provided endpoints.
-   */
-  public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) {
-    return new StaticServerSet(
-        ImmutableSet.copyOf(Iterables.transform(endpoints, 
ENDPOINT_TO_INSTANCE)));
-  }
-
-  private EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints,
-      Optional<Integer> shardId) {
-
-    LOG.warning("Attempt to join fixed server set ignored.");
-    ServiceInstance joining = new ServiceInstance(
-        ServerSets.toEndpoint(endpoint),
-        Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT),
-        Status.ALIVE);
-    if (shardId.isPresent()) {
-      joining.setShard(shardId.get());
-    }
-    if (!hosts.contains(joining)) {
-      LOG.log(Level.SEVERE,
-          "Joining instance " + joining + " does not match any member of the 
static set.");
-    }
-
-    return new EndpointStatus() {
-      @Override public void leave() throws UpdateException {
-        LOG.warning("Attempt to adjust state of fixed server set ignored.");
-      }
-
-      @Override public void update(Status status) throws UpdateException {
-        LOG.warning("Attempt to adjust state of fixed server set ignored.");
-      }
-    };
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints,
-      Status status) {
-
-    LOG.warning("This method is deprecated. Please do not specify a status 
field.");
-    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints) {
-
-    LOG.warning("Joining a ServerSet without a shard ID is deprecated and will 
soon break.");
-    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints,
-      int shardId) throws JoinException, InterruptedException {
-
-    return join(endpoint, auxEndpoints, Optional.of(shardId));
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) {
-    monitor.onChange(hosts);
-    return Commands.NOOP;
-  }
-
-  @Override
-  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws 
MonitorException {
-    watch(monitor);
-  }
-}

Reply via email to