http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Group.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Group.java b/commons/src/main/java/com/twitter/common/zookeeper/Group.java deleted file mode 100644 index 7a32cd0..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/Group.java +++ /dev/null @@ -1,708 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.util.List; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.regex.Pattern; - -import javax.annotation.Nullable; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.data.ACL; - -import com.twitter.common.base.Command; -import com.twitter.common.base.Commands; -import com.twitter.common.base.ExceptionalSupplier; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.util.BackoffHelper; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; - -/** - * This class exposes methods for joining and monitoring distributed groups. The groups this class - * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for - * each member of a group. - */ -public class Group { - private static final Logger LOG = Logger.getLogger(Group.class.getName()); - - private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null); - private static final String DEFAULT_NODE_NAME_PREFIX = "member_"; - - private final ZooKeeperClient zkClient; - private final ImmutableList<ACL> acl; - private final String path; - - private final NodeScheme nodeScheme; - private final Predicate<String> nodeNameFilter; - - private final BackoffHelper backoffHelper; - - /** - * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or - * duplicate slashes will be normalized. For example, all the following paths would create a - * group at the normalized path /my/distributed/group: - * <ul> - * <li>/my/distributed/group - * <li>/my/distributed/group/ - * <li>/my/distributed//group - * </ul> - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param acl the ACL to use for creating the persistent group path if it does not already exist - * @param path the absolute persistent path that represents this group - * @param nodeScheme the scheme that defines how nodes are created - */ - public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) { - this.zkClient = Preconditions.checkNotNull(zkClient); - this.acl = ImmutableList.copyOf(acl); - this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path)); - - this.nodeScheme = Preconditions.checkNotNull(nodeScheme); - nodeNameFilter = new Predicate<String>() { - @Override public boolean apply(String nodeName) { - return Group.this.nodeScheme.isMember(nodeName); - } - }; - - backoffHelper = new BackoffHelper(); - } - - /** - * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a - * {@code namePrefix} of 'member_'. - */ - public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { - this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX); - } - - /** - * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a - * {@link DefaultScheme} using {@code namePrefix}. - */ - public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) { - this(zkClient, acl, path, new DefaultScheme(namePrefix)); - } - - public String getMemberPath(String memberId) { - return path + "/" + MorePreconditions.checkNotBlank(memberId); - } - - public String getPath() { - return path; - } - - public String getMemberId(String nodePath) { - MorePreconditions.checkNotBlank(nodePath); - Preconditions.checkArgument(nodePath.startsWith(path + "/"), - "Not a member of this group[%s]: %s", path, nodePath); - - String memberId = StringUtils.substringAfterLast(nodePath, "/"); - Preconditions.checkArgument(nodeScheme.isMember(memberId), - "Not a group member: %s", memberId); - return memberId; - } - - /** - * Returns the current list of group member ids by querying ZooKeeper synchronously. - * - * @return the ids of all the present members of this group - * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper - * @throws KeeperException if there was a problem reading this group's member ids - * @throws InterruptedException if this thread is interrupted listing the group members - */ - public Iterable<String> getMemberIds() - throws ZooKeeperConnectionException, KeeperException, InterruptedException { - return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter); - } - - /** - * Gets the data for one of this groups members by querying ZooKeeper synchronously. - * - * @param memberId the id of the member whose data to retrieve - * @return the data associated with the {@code memberId} - * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper - * @throws KeeperException if there was a problem reading this member's data - * @throws InterruptedException if this thread is interrupted retrieving the member data - */ - public byte[] getMemberData(String memberId) - throws ZooKeeperConnectionException, KeeperException, InterruptedException { - return zkClient.get().getData(getMemberPath(memberId), false, null); - } - - /** - * Represents membership in a distributed group. - */ - public interface Membership { - - /** - * Returns the persistent ZooKeeper path that represents this group. - */ - String getGroupPath(); - - /** - * Returns the id (ZooKeeper node name) of this group member. May change over time if the - * ZooKeeper session expires. - */ - String getMemberId(); - - /** - * Returns the full ZooKeeper path to this group member. May change over time if the - * ZooKeeper session expires. - */ - String getMemberPath(); - - /** - * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to - * {@link Group#join()}. - * - * @return the new membership data - * @throws UpdateException if there was a problem updating the membership data - */ - byte[] updateMemberData() throws UpdateException; - - /** - * Cancels group membership by deleting the associated ZooKeeper member node. - * - * @throws JoinException if there is a problem deleting the node - */ - void cancel() throws JoinException; - } - - /** - * Indicates an error joining a group. - */ - public static class JoinException extends Exception { - public JoinException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Indicates an error updating a group member's data. - */ - public static class UpdateException extends Exception { - public UpdateException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Equivalent to calling {@code join(null, null)}. - */ - public final Membership join() throws JoinException, InterruptedException { - return join(NO_MEMBER_DATA, null); - } - - /** - * Equivalent to calling {@code join(memberData, null)}. - */ - public final Membership join(Supplier<byte[]> memberData) - throws JoinException, InterruptedException { - - return join(memberData, null); - } - - /** - * Equivalent to calling {@code join(null, onLoseMembership)}. - */ - public final Membership join(@Nullable final Command onLoseMembership) - throws JoinException, InterruptedException { - - return join(NO_MEMBER_DATA, onLoseMembership); - } - - /** - * Joins this group and returns the resulting Membership when successful. Membership will be - * automatically cancelled when the current jvm process dies; however the returned Membership - * object can be used to cancel membership earlier. Unless - * {@link com.twitter.common.zookeeper.Group.Membership#cancel()} is called the membership will - * be maintained by re-establishing it silently in the background. - * - * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an - * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses - * membership in the group. - * - * @param memberData a supplier of the data to store in the member node - * @param onLoseMembership a callback to notify when membership is lost - * @return a Membership object with the member details - * @throws JoinException if there was a problem joining the group - * @throws InterruptedException if this thread is interrupted awaiting completion of the join - */ - public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) - throws JoinException, InterruptedException { - - Preconditions.checkNotNull(memberData); - ensurePersistentGroupPath(); - - final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership); - return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() { - @Override public Membership get() throws JoinException { - try { - return groupJoiner.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Interrupted trying to join group at path: " + path, e); - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e); - return null; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e); - return null; - } else { - throw new JoinException("Problem joining partition group at path: " + path, e); - } - } - } - }); - } - - private void ensurePersistentGroupPath() throws JoinException, InterruptedException { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() { - @Override public Boolean get() throws JoinException { - try { - ZooKeeperUtils.ensurePath(zkClient, acl, path); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Interrupted trying to ensure group at path: " + path, e); - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e); - return false; - } else { - throw new JoinException("Problem ensuring group at path: " + path, e); - } - } - } - }); - } - - private class ActiveMembership implements Membership { - private final Supplier<byte[]> memberData; - private final Command onLoseMembership; - private String nodePath; - private String memberId; - private volatile boolean cancelled; - private byte[] membershipData; - - public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) { - this.memberData = memberData; - this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership; - } - - @Override - public String getGroupPath() { - return path; - } - - @Override - public synchronized String getMemberId() { - return memberId; - } - - @Override - public synchronized String getMemberPath() { - return nodePath; - } - - @Override - public synchronized byte[] updateMemberData() throws UpdateException { - byte[] membershipData = memberData.get(); - if (!ArrayUtils.isEquals(this.membershipData, membershipData)) { - try { - zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION); - this.membershipData = membershipData; - } catch (KeeperException e) { - throw new UpdateException("Problem updating membership data.", e); - } catch (InterruptedException e) { - throw new UpdateException("Interrupted attempting to update membership data.", e); - } catch (ZooKeeperConnectionException e) { - throw new UpdateException( - "Could not connect to the ZooKeeper cluster to update membership data.", e); - } - } - return membershipData; - } - - @Override - public synchronized void cancel() throws JoinException { - if (!cancelled) { - try { - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() { - @Override public Boolean get() throws JoinException { - try { - zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e); - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (NoNodeException e) { - LOG.info("Membership already cancelled, node at path: " + nodePath + - " has been deleted"); - return true; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e); - return false; - } else { - throw new JoinException("Problem cancelling membership: " + nodePath, e); - } - } - } - }); - cancelled = true; // Prevent auto-re-join logic from undoing this cancel. - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new JoinException("Problem cancelling membership: " + nodePath, e); - } - } - } - - private class CancelledException extends IllegalStateException { /* marker */ } - - synchronized Membership join() - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - - if (cancelled) { - throw new CancelledException(); - } - - if (nodePath == null) { - // Re-join if our ephemeral node goes away due to session expiry - only needs to be - // registered once. - zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - tryJoin(); - } - }); - } - - byte[] membershipData = memberData.get(); - String nodeName = nodeScheme.createName(membershipData); - CreateMode createMode = nodeScheme.isSequential() - ? CreateMode.EPHEMERAL_SEQUENTIAL - : CreateMode.EPHEMERAL; - nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode); - memberId = Group.this.getMemberId(nodePath); - LOG.info("Set group member ID to " + memberId); - this.membershipData = membershipData; - - // Re-join if our ephemeral node goes away due to maliciousness. - zkClient.get().exists(nodePath, new Watcher() { - @Override public void process(WatchedEvent event) { - if (event.getType() == EventType.NodeDeleted) { - tryJoin(); - } - } - }); - - return this; - } - - private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin = - new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - join(); - return true; - } catch (CancelledException e) { - // Lost a cancel race - that's ok. - return true; - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e); - return false; - } else { - throw new IllegalStateException("Permanent problem re-joining group: " + path, e); - } - } - } - }; - - private synchronized void tryJoin() { - onLoseMembership.execute(); - try { - backoffHelper.doUntilSuccess(tryJoin); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format("Interrupted while trying to re-join group: %s, giving up", path), e); - } - } - } - - /** - * An interface to an object that listens for changes to a group's membership. - */ - public interface GroupChangeListener { - - /** - * Called whenever group membership changes with the new list of member ids. - * - * @param memberIds the current member ids - */ - void onGroupChange(Iterable<String> memberIds); - } - - /** - * An interface that dictates the scheme to use for storing and filtering nodes that represent - * members of a distributed group. - */ - public interface NodeScheme { - /** - * Determines if a child node is a member of a group by examining the node's name. - * - * @param nodeName the name of a child node found in a group - * @return {@code true} if {@code nodeName} identifies a group member in this scheme - */ - boolean isMember(String nodeName); - - /** - * Generates a node name for the node representing this process in the distributed group. - * - * @param membershipData the data that will be stored in this node - * @return the name for the node that will represent this process in the group - */ - String createName(byte[] membershipData); - - /** - * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes. - * - * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise - */ - boolean isSequential(); - } - - /** - * Indicates an error watching a group. - */ - public static class WatchException extends Exception { - public WatchException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * Watches this group for the lifetime of this jvm process. This method will block until the - * current group members are available, notify the {@code groupChangeListener} and then return. - * All further changes to the group membership will cause notifications on a background thread. - * - * @param groupChangeListener the listener to notify of group membership change events - * @return A command which, when executed, will stop watching the group. - * @throws WatchException if there is a problem generating the 1st group membership list - * @throws InterruptedException if interrupted waiting to gather the 1st group membership list - */ - public final Command watch(final GroupChangeListener groupChangeListener) - throws WatchException, InterruptedException { - Preconditions.checkNotNull(groupChangeListener); - - try { - ensurePersistentGroupPath(); - } catch (JoinException e) { - throw new WatchException("Failed to create group path: " + path, e); - } - - final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener); - backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() { - @Override public Boolean get() throws WatchException { - try { - groupMonitor.watchGroup(); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new WatchException("Interrupted trying to watch group at path: " + path, e); - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e); - return null; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e); - return null; - } else { - throw new WatchException("Problem trying to watch group at path: " + path, e); - } - } - } - }); - return new Command() { - @Override public void execute() { - groupMonitor.stopWatching(); - } - }; - } - - /** - * Helps continuously monitor a group for membership changes. - */ - private class GroupMonitor { - private final GroupChangeListener groupChangeListener; - private volatile boolean stopped = false; - private Set<String> members; - - GroupMonitor(GroupChangeListener groupChangeListener) { - this.groupChangeListener = groupChangeListener; - } - - private final Watcher groupWatcher = new Watcher() { - @Override public final void process(WatchedEvent event) { - if (event.getType() == EventType.NodeChildrenChanged) { - tryWatchGroup(); - } - } - }; - - private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup = - new ExceptionalSupplier<Boolean, InterruptedException>() { - @Override public Boolean get() throws InterruptedException { - try { - watchGroup(); - return true; - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); - return false; - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e); - return false; - } else { - throw new IllegalStateException("Permanent problem re-watching group: " + path, e); - } - } - } - }; - - private void tryWatchGroup() { - if (stopped) { - return; - } - - try { - backoffHelper.doUntilSuccess(tryWatchGroup); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - String.format("Interrupted while trying to re-watch group: %s, giving up", path), e); - } - } - - private void watchGroup() - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - - if (stopped) { - return; - } - - List<String> children = zkClient.get().getChildren(path, groupWatcher); - setMembers(Iterables.filter(children, nodeNameFilter)); - } - - private void stopWatching() { - // TODO(William Farner): Cancel the watch when - // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved. - LOG.info("Stopping watch on " + this); - stopped = true; - } - - synchronized void setMembers(Iterable<String> members) { - if (stopped) { - LOG.info("Suppressing membership update, no longer watching " + this); - return; - } - - if (this.members == null) { - // Reset our watch on the group if session expires - only needs to be registered once. - zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - tryWatchGroup(); - } - }); - } - - Set<String> membership = ImmutableSet.copyOf(members); - if (!membership.equals(this.members)) { - groupChangeListener.onGroupChange(members); - this.members = membership; - } - } - } - - /** - * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] + - * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the - * prefix is "member_", the node's full path will look something like - * {@code /discovery/servicename/member_0000000007}. - */ - public static class DefaultScheme implements NodeScheme { - private final String namePrefix; - private final Pattern namePattern; - - /** - * Creates a sequential node scheme based on the given node name prefix. - * - * @param namePrefix the prefix for the names of the member nodes - */ - public DefaultScheme(String namePrefix) { - this.namePrefix = MorePreconditions.checkNotBlank(namePrefix); - namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$"); - } - - @Override - public boolean isMember(String nodeName) { - return namePattern.matcher(nodeName).matches(); - } - - @Override - public String createName(byte[] membershipData) { - return namePrefix; - } - - @Override - public boolean isSequential() { - return true; - } - } - - @Override - public String toString() { - return "Group " + path; - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java b/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java deleted file mode 100644 index 4cdf5f2..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/Partitioner.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.Ordering; -import com.twitter.common.zookeeper.Group.GroupChangeListener; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.common.zookeeper.Group.Membership; -import com.twitter.common.zookeeper.Group.UpdateException; -import com.twitter.common.zookeeper.Group.WatchException; -import org.apache.zookeeper.data.ACL; - -import javax.annotation.Nullable; -import java.util.List; -import java.util.logging.Logger; - -/** - * A distributed mechanism for eventually arriving at an evenly partitioned space of long values. - * A typical usage would have a client on each of several hosts joining a logical partition (a - * "partition group") that represents some shared work. Clients could then process a subset of a - * full body of work by testing any given item of work with their partition filter. - * - * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1 - * partition as explained in {@link #join()}. - * - * @author John Sirois - */ -public class Partitioner { - - private static final Logger LOG = Logger.getLogger(Partitioner.class.getName()); - - private volatile int groupSize; - private volatile int groupIndex; - private final Group group; - - /** - * Constructs a representation of a partition group but does not join it. Note that the partition - * group path will be created as a persistent zookeeper path if it does not already exist. - * - * @param zkClient a client to use for joining the partition group and watching its membership - * @param acl the acl for this partition group - * @param path a zookeeper path that represents the partition group - */ - public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) { - group = new Group(zkClient, acl, path); - } - - @VisibleForTesting - int getGroupSize() { - return groupSize; - } - - /** - * Represents a slice of a partition group. The partition is dynamic and will adjust its size as - * members join and leave its partition group. - */ - public abstract static class Partition implements Predicate<Long>, Membership { - - /** - * Returns {@code true} if the given {@code value} is a member of this partition at this time. - */ - public abstract boolean isMember(long value); - - /** - * Gets number of members in the group at this time. - * - * @return number of members in the ZK group at this time. - */ - public abstract int getNumPartitions(); - - /** - * Evaluates partition membership based on the given {@code value}'s hash code. If the value - * is null it is never a member of a partition. - */ - boolean isMember(Object value) { - return (value != null) && isMember(value.hashCode()); - } - - /** - * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing - * overhead. - */ - @Override - public boolean apply(@Nullable Long input) { - return (input != null) && isMember(input); - } - } - - /** - * Attempts to join the partition group and claim a slice. When successful, a predicate is - * returned that can be used to test whether or not an item belongs to this partition. The - * predicate is dynamic such that as the group is further partitioned or partitions merge the - * predicate will claim a narrower or wider swath of the partition space respectively. Partition - * creation and merging is not instantaneous and clients should expect independent partitions to - * claim ownership of some items when partition membership is in flux. It is only in the steady - * state that a client should expect independent partitions to divide the partition space evenly - * and without overlap. - * - * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation. - * - * @return the partition representing the slice of the partition group this member can claim - * @throws JoinException if there was a problem joining the partition group - * @throws InterruptedException if interrupted while waiting to join the partition group - */ - public final Partition join() throws JoinException, InterruptedException { - final Membership membership = group.join(); - try { - group.watch(createGroupChangeListener(membership)); - } catch (WatchException e) { - membership.cancel(); - throw new JoinException("Problem establishing watch on group after joining it", e); - } - return new Partition() { - @Override public boolean isMember(long value) { - return (value % groupSize) == groupIndex; - } - - @Override public int getNumPartitions() { - return groupSize; - } - - @Override public String getGroupPath() { - return membership.getGroupPath(); - } - - @Override public String getMemberId() { - return membership.getMemberId(); - } - - @Override public String getMemberPath() { - return membership.getMemberPath(); - } - - @Override public byte[] updateMemberData() throws UpdateException { - return membership.updateMemberData(); - } - - @Override public void cancel() throws JoinException { - membership.cancel(); - } - }; - } - - @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) { - return new GroupChangeListener() { - @Override public void onGroupChange(Iterable<String> memberIds) { - List<String> members = Ordering.natural().sortedCopy(memberIds); - int newSize = members.size(); - int newIndex = members.indexOf(membership.getMemberId()); - - LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]", - membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex)); - - groupSize = newSize; - groupIndex = newIndex; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java deleted file mode 100644 index 2021f51..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSet.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import com.twitter.common.net.pool.DynamicHostSet; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; - -import java.net.InetSocketAddress; -import java.util.Map; - -/** - * A logical set of servers registered in ZooKeeper. Intended to be used by both servers in a - * common service and their clients. - * - * TODO(William Farner): Explore decoupling this from thrift. - */ -public interface ServerSet extends DynamicHostSet<ServiceInstance> { - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @param status the current service status - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - * @deprecated The status field is deprecated. Please use {@link #join(InetSocketAddress, Map)} - */ - @Deprecated - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Status status) throws JoinException, InterruptedException; - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - */ - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints) - throws JoinException, InterruptedException; - - /** - * Attempts to join a server set for this logical service group. - * - * @param endpoint the primary service endpoint - * @param additionalEndpoints and additional endpoints keyed by their logical name - * @param shardId Unique shard identifier for this member of the service. - * @return an EndpointStatus object that allows the endpoint to adjust its status - * @throws JoinException if there was a problem joining the server set - * @throws InterruptedException if interrupted while waiting to join the server set - */ - EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - int shardId) throws JoinException, InterruptedException; - - /** - * A handle to a service endpoint's status data that allows updating it to track current events. - */ - public interface EndpointStatus { - - /** - * Attempts to update the status of the service endpoint associated with this endpoint. If the - * {@code status} is {@link Status#DEAD} then the endpoint will be removed from the server set. - * - * @param status the current status of the endpoint - * @throws UpdateException if there was a problem writing the update - * @deprecated Support for mutable status is deprecated. Please use {@link #leave()} - */ - @Deprecated - void update(Status status) throws UpdateException; - - /** - * Removes the endpoint from the server set. - * - * @throws UpdateException if there was a problem leaving the ServerSet. - */ - void leave() throws UpdateException; - } - - /** - * Indicates an error updating a service's status information. - */ - public static class UpdateException extends Exception { - public UpdateException(String message, Throwable cause) { - super(message, cause); - } - - public UpdateException(String message) { - super(message); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java deleted file mode 100644 index b00015e..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSetImpl.java +++ /dev/null @@ -1,606 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.net.InetSocketAddress; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; -import com.google.common.util.concurrent.UncheckedExecutionException; -import com.google.gson.Gson; - -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; - -import com.twitter.common.args.Arg; -import com.twitter.common.args.CmdLine; -import com.twitter.common.base.Command; -import com.twitter.common.base.Function; -import com.twitter.common.base.Supplier; -import com.twitter.common.io.Codec; -import com.twitter.common.io.CompatibilityCodec; -import com.twitter.common.io.ThriftCodec; -import com.twitter.common.util.BackoffHelper; -import com.twitter.common.zookeeper.Group.GroupChangeListener; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.common.zookeeper.Group.Membership; -import com.twitter.common.zookeeper.Group.WatchException; -import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * ZooKeeper-backed implementation of {@link ServerSet}. - */ -public class ServerSetImpl implements ServerSet { - private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName()); - - @CmdLine(name = "serverset_encode_json", - help = "If true, use JSON for encoding server set information." - + " Defaults to true (use JSON).") - private static final Arg<Boolean> ENCODE_JSON = Arg.create(true); - - private final ZooKeeperClient zkClient; - private final Group group; - private final Codec<ServiceInstance> codec; - private final BackoffHelper backoffHelper; - - /** - * Creates a new ServerSet using open ZooKeeper node ACLs. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param path the name-service path of the service to connect to - */ - public ServerSetImpl(ZooKeeperClient zkClient, String path) { - this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path); - } - - /** - * Creates a new ServerSet for the given service {@code path}. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param acl the ACL to use for creating the persistent group path if it does not already exist - * @param path the name-service path of the service to connect to - */ - public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { - this(zkClient, new Group(zkClient, acl, path), createDefaultCodec()); - } - - /** - * Creates a new ServerSet using the given service {@code group}. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param group the server group - */ - public ServerSetImpl(ZooKeeperClient zkClient, Group group) { - this(zkClient, group, createDefaultCodec()); - } - - /** - * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}. - * - * @param zkClient the client to use for interactions with ZooKeeper - * @param group the server group - * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and - * from a byte array - */ - public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) { - this.zkClient = checkNotNull(zkClient); - this.group = checkNotNull(group); - this.codec = checkNotNull(codec); - - // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable. - backoffHelper = new BackoffHelper(); - } - - @VisibleForTesting - ZooKeeperClient getZkClient() { - return zkClient; - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints) - throws JoinException, InterruptedException { - - LOG.log(Level.WARNING, - "Joining a ServerSet without a shard ID is deprecated and will soon break."); - return join(endpoint, additionalEndpoints, Optional.<Integer>absent()); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - int shardId) throws JoinException, InterruptedException { - - return join(endpoint, additionalEndpoints, Optional.of(shardId)); - } - - private EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Optional<Integer> shardId) throws JoinException, InterruptedException { - - checkNotNull(endpoint); - checkNotNull(additionalEndpoints); - - final MemberStatus memberStatus = - new MemberStatus(endpoint, additionalEndpoints, shardId); - Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() { - @Override public byte[] get() { - return memberStatus.serializeServiceInstance(); - } - }; - final Membership membership = group.join(serviceInstanceSupplier); - - return new EndpointStatus() { - @Override public void update(Status status) throws UpdateException { - checkNotNull(status); - LOG.warning("This method is deprecated. Please use leave() instead."); - if (status == Status.DEAD) { - leave(); - } else { - LOG.warning("Status update has been ignored"); - } - } - - @Override public void leave() throws UpdateException { - memberStatus.leave(membership); - } - }; - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Status status) throws JoinException, InterruptedException { - - LOG.warning("This method is deprecated. Please do not specify a status field."); - if (status != Status.ALIVE) { - LOG.severe("**************************************************************************\n" - + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n" - + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status - + "\n**************************************************************************"); - } - return join(endpoint, additionalEndpoints); - } - - @Override - public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor); - try { - return serverSetWatcher.watch(); - } catch (WatchException e) { - throw new MonitorException("ZooKeeper watch failed.", e); - } catch (InterruptedException e) { - throw new MonitorException("Interrupted while watching ZooKeeper.", e); - } - } - - @Override - public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - LOG.warning("This method is deprecated. Please use watch instead."); - watch(monitor); - } - - private class MemberStatus { - private final InetSocketAddress endpoint; - private final Map<String, InetSocketAddress> additionalEndpoints; - private final Optional<Integer> shardId; - - private MemberStatus( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> additionalEndpoints, - Optional<Integer> shardId) { - - this.endpoint = endpoint; - this.additionalEndpoints = additionalEndpoints; - this.shardId = shardId; - } - - synchronized void leave(Membership membership) throws UpdateException { - try { - membership.cancel(); - } catch (JoinException e) { - throw new UpdateException( - "Failed to auto-cancel group membership on transition to DEAD status", e); - } - } - - byte[] serializeServiceInstance() { - ServiceInstance serviceInstance = new ServiceInstance( - ServerSets.toEndpoint(endpoint), - Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT), - Status.ALIVE); - - if (shardId.isPresent()) { - serviceInstance.setShard(shardId.get()); - } - - LOG.fine("updating endpoint data to:\n\t" + serviceInstance); - try { - return ServerSets.serializeServiceInstance(serviceInstance, codec); - } catch (IOException e) { - throw new IllegalStateException("Unexpected problem serializing thrift struct " + - serviceInstance + "to a byte[]", e); - } - } - } - - private static class ServiceInstanceFetchException extends RuntimeException { - ServiceInstanceFetchException(String message, Throwable cause) { - super(message, cause); - } - } - - private static class ServiceInstanceDeletedException extends RuntimeException { - ServiceInstanceDeletedException(String path) { - super(path); - } - } - - private class ServerSetWatcher { - private final ZooKeeperClient zkClient; - private final HostChangeMonitor<ServiceInstance> monitor; - @Nullable private ImmutableSet<ServiceInstance> serverSet; - - ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) { - this.zkClient = zkClient; - this.monitor = monitor; - } - - public Command watch() throws WatchException, InterruptedException { - Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new Command() { - @Override public void execute() { - // Servers may have changed Status while we were disconnected from ZooKeeper, check and - // re-register our node watches. - rebuildServerSet(); - } - }); - - try { - return group.watch(new GroupChangeListener() { - @Override public void onGroupChange(Iterable<String> memberIds) { - notifyGroupChange(memberIds); - } - }); - } catch (WatchException e) { - zkClient.unregister(onExpirationWatcher); - throw e; - } catch (InterruptedException e) { - zkClient.unregister(onExpirationWatcher); - throw e; - } - } - - private ServiceInstance getServiceInstance(final String nodePath) { - try { - return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() { - @Override public ServiceInstance get() { - try { - byte[] data = zkClient.get().getData(nodePath, false, null); - return ServerSets.deserializeServiceInstance(data, codec); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServiceInstanceFetchException( - "Interrupted updating service data for: " + nodePath, e); - } catch (ZooKeeperConnectionException e) { - LOG.log(Level.WARNING, - "Temporary error trying to updating service data for: " + nodePath, e); - return null; - } catch (NoNodeException e) { - invalidateNodePath(nodePath); - throw new ServiceInstanceDeletedException(nodePath); - } catch (KeeperException e) { - if (zkClient.shouldRetry(e)) { - LOG.log(Level.WARNING, - "Temporary error trying to update service data for: " + nodePath, e); - return null; - } else { - throw new ServiceInstanceFetchException( - "Failed to update service data for: " + nodePath, e); - } - } catch (IOException e) { - throw new ServiceInstanceFetchException( - "Failed to deserialize the ServiceInstance data for: " + nodePath, e); - } - } - }); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServiceInstanceFetchException( - "Interrupted trying to update service data for: " + nodePath, e); - } - } - - private final LoadingCache<String, ServiceInstance> servicesByMemberId = - CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() { - @Override public ServiceInstance load(String memberId) { - return getServiceInstance(group.getMemberPath(memberId)); - } - }); - - private void rebuildServerSet() { - Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet()); - servicesByMemberId.invalidateAll(); - notifyGroupChange(memberIds); - } - - private String invalidateNodePath(String deletedPath) { - String memberId = group.getMemberId(deletedPath); - servicesByMemberId.invalidate(memberId); - return memberId; - } - - private final Function<String, ServiceInstance> MAYBE_FETCH_NODE = - new Function<String, ServiceInstance>() { - @Override public ServiceInstance apply(String memberId) { - // This get will trigger a fetch - try { - return servicesByMemberId.getUnchecked(memberId); - } catch (UncheckedExecutionException e) { - Throwable cause = e.getCause(); - if (!(cause instanceof ServiceInstanceDeletedException)) { - Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class); - throw new IllegalStateException( - "Unexpected error fetching member data for: " + memberId, e); - } - return null; - } - } - }; - - private synchronized void notifyGroupChange(Iterable<String> memberIds) { - ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds); - Set<String> existingMemberIds = servicesByMemberId.asMap().keySet(); - - // Ignore no-op state changes except for the 1st when we've seen no group yet. - if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) { - SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds); - // Implicit removal from servicesByMemberId. - existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds)); - - Iterable<ServiceInstance> serviceInstances = Iterables.filter( - Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull()); - - notifyServerSetChange(ImmutableSet.copyOf(serviceInstances)); - } - } - - private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) { - // ZK nodes may have changed if there was a session expiry for a server in the server set, but - // if the server's status has not changed, we can skip any onChange updates. - if (!currentServerSet.equals(serverSet)) { - if (currentServerSet.isEmpty()) { - LOG.warning("server set empty for path " + group.getPath()); - } else { - if (LOG.isLoggable(Level.INFO)) { - if (serverSet == null) { - LOG.info("received initial membership " + currentServerSet); - } else { - logChange(Level.INFO, currentServerSet); - } - } - } - serverSet = currentServerSet; - monitor.onChange(serverSet); - } - } - - private void logChange(Level level, ImmutableSet<ServiceInstance> newServerSet) { - StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: "); - if (serverSet.size() != newServerSet.size()) { - message.append("from ").append(serverSet.size()) - .append(" members to ").append(newServerSet.size()); - } - - Joiner joiner = Joiner.on("\n\t\t"); - - SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet); - if (!left.isEmpty()) { - message.append("\n\tleft:\n\t\t").append(joiner.join(left)); - } - - SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet); - if (!joined.isEmpty()) { - message.append("\n\tjoined:\n\t\t").append(joiner.join(joined)); - } - - LOG.log(level, message.toString()); - } - } - - private static class EndpointSchema { - final String host; - final Integer port; - - EndpointSchema(Endpoint endpoint) { - Preconditions.checkNotNull(endpoint); - this.host = endpoint.getHost(); - this.port = endpoint.getPort(); - } - - String getHost() { - return host; - } - - Integer getPort() { - return port; - } - } - - private static class ServiceInstanceSchema { - final EndpointSchema serviceEndpoint; - final Map<String, EndpointSchema> additionalEndpoints; - final Status status; - final Integer shard; - - ServiceInstanceSchema(ServiceInstance instance) { - this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); - if (instance.getAdditionalEndpoints() != null) { - this.additionalEndpoints = Maps.transformValues( - instance.getAdditionalEndpoints(), - new Function<Endpoint, EndpointSchema>() { - @Override public EndpointSchema apply(Endpoint endpoint) { - return new EndpointSchema(endpoint); - } - } - ); - } else { - this.additionalEndpoints = Maps.newHashMap(); - } - this.status = instance.getStatus(); - this.shard = instance.isSetShard() ? instance.getShard() : null; - } - - EndpointSchema getServiceEndpoint() { - return serviceEndpoint; - } - - Map<String, EndpointSchema> getAdditionalEndpoints() { - return additionalEndpoints; - } - - Status getStatus() { - return status; - } - - Integer getShard() { - return shard; - } - } - - /** - * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the - * __isset_bit_vector internal thrift struct field that tracks primitive types. - */ - private static class AdaptedJsonCodec implements Codec<ServiceInstance> { - private static final Charset ENCODING = Charsets.UTF_8; - private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class; - private final Gson gson = new Gson(); - - @Override - public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { - Writer w = new OutputStreamWriter(sink, ENCODING); - gson.toJson(new ServiceInstanceSchema(instance), CLASS, w); - w.flush(); - } - - @Override - public ServiceInstance deserialize(InputStream source) throws IOException { - ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS); - Endpoint primary = new Endpoint( - output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort()); - Map<String, Endpoint> additional = Maps.transformValues( - output.getAdditionalEndpoints(), - new Function<EndpointSchema, Endpoint>() { - @Override public Endpoint apply(EndpointSchema endpoint) { - return new Endpoint(endpoint.getHost(), endpoint.getPort()); - } - } - ); - ServiceInstance instance = - new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus()); - if (output.getShard() != null) { - instance.setShard(output.getShard()); - } - return instance; - } - } - - private static Codec<ServiceInstance> createCodec(final boolean useJsonEncoding) { - final Codec<ServiceInstance> json = new AdaptedJsonCodec(); - final Codec<ServiceInstance> thrift = - ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL); - final Predicate<byte[]> recognizer = new Predicate<byte[]>() { - public boolean apply(byte[] input) { - return (input.length > 1 && input[0] == '{' && input[1] == '\"') == useJsonEncoding; - } - }; - - if (useJsonEncoding) { - return CompatibilityCodec.create(json, thrift, 2, recognizer); - } - return CompatibilityCodec.create(thrift, json, 2, recognizer); - } - - /** - * Creates a codec for {@link ServiceInstance} objects that uses Thrift binary encoding, and can - * decode both Thrift and JSON encodings. - * - * @return a new codec instance. - */ - public static Codec<ServiceInstance> createThriftCodec() { - return createCodec(false); - } - - /** - * Creates a codec for {@link ServiceInstance} objects that uses JSON encoding, and can decode - * both Thrift and JSON encodings. - * - * @return a new codec instance. - */ - public static Codec<ServiceInstance> createJsonCodec() { - return createCodec(true); - } - - /** - * Returns a codec for {@link ServiceInstance} objects that uses either the Thrift or the JSON - * encoding, depending on whether the command line argument <tt>serverset_json_encofing</tt> is - * set to <tt>true</tt>, and can decode both Thrift and JSON encodings. - * - * @return a new codec instance. - */ - public static Codec<ServiceInstance> createDefaultCodec() { - return createCodec(ENCODE_JSON.get()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java b/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java deleted file mode 100644 index fc7ea88..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/ServerSets.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.zookeeper.data.ACL; - -import com.twitter.common.base.Function; -import com.twitter.common.base.MorePreconditions; -import com.twitter.common.io.Codec; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; - -/** - * Common ServerSet related functions - */ -public class ServerSets { - - private ServerSets() { - // Utility class. - } - - /** - * A function that invokes {@link #toEndpoint(InetSocketAddress)}. - */ - public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT = - new Function<InetSocketAddress, Endpoint>() { - @Override public Endpoint apply(InetSocketAddress address) { - return ServerSets.toEndpoint(address); - } - }; - - /** - * Creates a server set that registers at a single path applying the given ACL to all nodes - * created in the path. - * - * @param zkClient ZooKeeper client to register with. - * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. - * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set) - * @return A server set that registers at {@code zkPath}. - */ - public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) { - return create(zkClient, acl, ImmutableSet.of(zkPath)); - } - - /** - * Creates a server set that registers at one or multiple paths applying the given ACL to all - * nodes created in the paths. - * - * @param zkClient ZooKeeper client to register with. - * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. - * @param zkPaths Paths to register at, must be non-empty. - * @return A server set that registers at the given {@code zkPath}s. - */ - public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) { - Preconditions.checkNotNull(zkClient); - MorePreconditions.checkNotBlank(acl); - MorePreconditions.checkNotBlank(zkPaths); - - if (zkPaths.size() == 1) { - return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths)); - } else { - ImmutableList.Builder<ServerSet> builder = ImmutableList.builder(); - for (String path : zkPaths) { - builder.add(new ServerSetImpl(zkClient, acl, path)); - } - return new CompoundServerSet(builder.build()); - } - } - - /** - * Returns a serialized Thrift service instance object, with given endpoints and codec. - * - * @param serviceInstance the Thrift service instance object to be serialized - * @param codec the codec to use to serialize a Thrift service instance object - * @return byte array that contains a serialized Thrift service instance - */ - public static byte[] serializeServiceInstance( - ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException { - - ByteArrayOutputStream output = new ByteArrayOutputStream(); - codec.serialize(serviceInstance, output); - return output.toByteArray(); - } - - /** - * Serializes a service instance based on endpoints. - * @see #serializeServiceInstance(ServiceInstance, Codec) - * - * @param address the target address of the service instance - * @param additionalEndpoints additional endpoints of the service instance - * @param status service status - */ - public static byte[] serializeServiceInstance( - InetSocketAddress address, - Map<String, Endpoint> additionalEndpoints, - Status status, - Codec<ServiceInstance> codec) throws IOException { - - ServiceInstance serviceInstance = - new ServiceInstance(toEndpoint(address), additionalEndpoints, status); - return serializeServiceInstance(serviceInstance, codec); - } - - /** - * Creates a service instance object deserialized from byte array. - * - * @param data the byte array contains a serialized Thrift service instance - * @param codec the codec to use to deserialize the byte array - */ - public static ServiceInstance deserializeServiceInstance( - byte[] data, Codec<ServiceInstance> codec) throws IOException { - - return codec.deserialize(new ByteArrayInputStream(data)); - } - - /** - * Creates an endpoint for the given InetSocketAddress. - * - * @param address the target address to create the endpoint for - */ - public static Endpoint toEndpoint(InetSocketAddress address) { - return new Endpoint(address.getHostName(), address.getPort()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java b/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java deleted file mode 100644 index 91e28f2..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/SingletonService.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; - -import com.twitter.common.base.ExceptionalCommand; -import com.twitter.common.zookeeper.Candidate.Leader; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.common.zookeeper.ServerSet.EndpointStatus; -import com.twitter.common.zookeeper.ServerSet.UpdateException; -import com.twitter.thrift.Status; - -/** - * A service that uses master election to only allow a single instance of the server to join - * the {@link ServerSet} at a time. - */ -public class SingletonService { - private static final Logger LOG = Logger.getLogger(SingletonService.class.getName()); - - @VisibleForTesting - static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; - - /** - * Creates a candidate that can be combined with an existing server set to form a singleton - * service using {@link #SingletonService(ServerSet, Candidate)}. - * - * @param zkClient The ZooKeeper client to use. - * @param servicePath The path where service nodes live. - * @param acl The acl to apply to newly created candidate nodes and serverset nodes. - * @return A candidate that can be housed with a standard server set under a single zk path. - */ - public static Candidate createSingletonCandidate( - ZooKeeperClient zkClient, - String servicePath, - Iterable<ACL> acl) { - - return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); - } - - private final ServerSet serverSet; - private final Candidate candidate; - - /** - * Equivalent to {@link #SingletonService(ZooKeeperClient, String, Iterable)} with a default - * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). - */ - public SingletonService(ZooKeeperClient zkClient, String servicePath) { - this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - - /** - * Creates a new singleton service, identified by {@code servicePath}. All nodes related to the - * service (for both leader election and service registration) will live under the path and each - * node will be created with the supplied {@code acl}. Internally, two ZooKeeper {@code Group}s - * are used to manage a singleton service - one for leader election, and another for the - * {@code ServerSet} where the leader's endpoints are registered. Leadership election should - * guarantee that at most one instance will ever exist in the ServerSet at once. - * - * @param zkClient The ZooKeeper client to use. - * @param servicePath The path where service nodes live. - * @param acl The acl to apply to newly created candidate nodes and serverset nodes. - */ - public SingletonService(ZooKeeperClient zkClient, String servicePath, Iterable<ACL> acl) { - this( - new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)), - createSingletonCandidate(zkClient, servicePath, acl)); - } - - /** - * Creates a new singleton service that uses the supplied candidate to vie for leadership and then - * advertises itself in the given server set once elected. - * - * @param serverSet The server set to advertise in on election. - * @param candidate The candidacy to use to vie for election. - */ - public SingletonService(ServerSet serverSet, Candidate candidate) { - this.serverSet = Preconditions.checkNotNull(serverSet); - this.candidate = Preconditions.checkNotNull(candidate); - } - - /** - * Attempts to lead the singleton service. - * - * @param endpoint The primary endpoint to register as a leader candidate in the service. - * @param additionalEndpoints Additional endpoints that are available on the host. - * @param status deprecated, will be ignored entirely - * @param listener Handler to call when the candidate is elected or defeated. - * @throws Group.WatchException If there was a problem watching the ZooKeeper group. - * @throws Group.JoinException If there was a problem joining the ZooKeeper group. - * @throws InterruptedException If the thread watching/joining the group was interrupted. - * @deprecated The status field is deprecated. Please use - * {@link #lead(InetSocketAddress, Map, LeadershipListener)} - */ - @Deprecated - public void lead(final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final Status status, - final LeadershipListener listener) - throws Group.WatchException, Group.JoinException, InterruptedException { - - if (status != Status.ALIVE) { - LOG.severe("******************************************************************************"); - LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED."); - LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status); - LOG.severe("******************************************************************************"); - } else { - LOG.warning("******************************************************************************"); - LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED."); - LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, LeadershipListener)"); - LOG.warning("******************************************************************************"); - } - - lead(endpoint, additionalEndpoints, listener); - } - - /** - * Attempts to lead the singleton service. - * - * @param endpoint The primary endpoint to register as a leader candidate in the service. - * @param additionalEndpoints Additional endpoints that are available on the host. - * @param listener Handler to call when the candidate is elected or defeated. - * @throws Group.WatchException If there was a problem watching the ZooKeeper group. - * @throws Group.JoinException If there was a problem joining the ZooKeeper group. - * @throws InterruptedException If the thread watching/joining the group was interrupted. - */ - public void lead(final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final LeadershipListener listener) - throws Group.WatchException, Group.JoinException, InterruptedException { - - Preconditions.checkNotNull(listener); - - candidate.offerLeadership(new Leader() { - private EndpointStatus endpointStatus = null; - @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { - listener.onLeading(new LeaderControl() { - EndpointStatus endpointStatus = null; - final AtomicBoolean left = new AtomicBoolean(false); - - // Methods are synchronized to prevent simultaneous invocations. - @Override public synchronized void advertise() - throws JoinException, InterruptedException { - - Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); - Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); - endpointStatus = serverSet.join(endpoint, additionalEndpoints); - } - - @Override public synchronized void leave() throws UpdateException, JoinException { - Preconditions.checkState(left.compareAndSet(false, true), - "Cannot leave more than once."); - if (endpointStatus != null) { - endpointStatus.leave(); - } - abdicate.execute(); - } - }); - } - - @Override public void onDefeated() { - listener.onDefeated(endpointStatus); - } - }); - } - - /** - * A listener to be notified of changes in the leadership status. - * Implementers should be careful to avoid blocking operations in these callbacks. - */ - public interface LeadershipListener { - - /** - * Notifies the listener that is is current leader. - * - * @param control A controller handle to advertise and/or leave advertised presence. - */ - public void onLeading(LeaderControl control); - - /** - * Notifies the listener that it is no longer leader. The leader should take this opportunity - * to remove its advertisement gracefully. - * - * @param status A handle on the endpoint status for the advertised leader. - */ - public void onDefeated(@Nullable EndpointStatus status); - } - - /** - * A leadership listener that decorates another listener by automatically defeating a - * leader that has dropped its connection to ZooKeeper. - * Note that the decision to use this over session-based mutual exclusion should not be taken - * lightly. Any momentary connection loss due to a flaky network or a ZooKeeper server process - * exit will cause a leader to abort. - */ - public static class DefeatOnDisconnectLeader implements LeadershipListener { - - private final LeadershipListener wrapped; - private Optional<LeaderControl> maybeControl = Optional.absent(); - - /** - * Creates a new leadership listener that will delegate calls to the wrapped listener, and - * invoke {@link #onDefeated(EndpointStatus)} if a ZooKeeper disconnect is observed while - * leading. - * - * @param zkClient The ZooKeeper client to watch for disconnect events. - * @param wrapped The leadership listener to wrap. - */ - public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, LeadershipListener wrapped) { - this.wrapped = Preconditions.checkNotNull(wrapped); - - zkClient.register(new Watcher() { - @Override public void process(WatchedEvent event) { - if ((event.getType() == EventType.None) - && (event.getState() == KeeperState.Disconnected)) { - disconnected(); - } - } - }); - } - - private synchronized void disconnected() { - if (maybeControl.isPresent()) { - LOG.warning("Disconnected from ZooKeeper while leading, committing suicide."); - try { - wrapped.onDefeated(null); - maybeControl.get().leave(); - } catch (UpdateException e) { - LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e); - } catch (JoinException e) { - LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e); - } finally { - setControl(null); - } - } else { - LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not the leader."); - } - } - - private synchronized void setControl(@Nullable LeaderControl control) { - this.maybeControl = Optional.fromNullable(control); - } - - @Override public void onLeading(final LeaderControl control) { - setControl(control); - wrapped.onLeading(new LeaderControl() { - @Override public void advertise() throws JoinException, InterruptedException { - control.advertise(); - } - - @Override public void leave() throws UpdateException, JoinException { - setControl(null); - control.leave(); - } - }); - } - - @Override public void onDefeated(@Nullable EndpointStatus status) { - setControl(null); - wrapped.onDefeated(status); - } - } - - /** - * A controller for the state of the leader. This will be provided to the leader upon election, - * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and - * terminate leadership at will. - */ - public interface LeaderControl { - - /** - * Advertises the leader's server presence to clients. - * - * @throws JoinException If there was an error advertising. - * @throws InterruptedException If interrupted while advertising. - */ - void advertise() throws JoinException, InterruptedException; - - /** - * Leaves candidacy for leadership, removing advertised server presence if applicable. - * - * @throws UpdateException If the leader's status could not be updated. - * @throws JoinException If there was an error abdicating from leader election. - */ - void leave() throws UpdateException, JoinException; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java b/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java deleted file mode 100644 index 56adcfe..0000000 --- a/commons/src/main/java/com/twitter/common/zookeeper/StaticServerSet.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -import com.twitter.common.base.Command; -import com.twitter.common.base.Commands; -import com.twitter.common.zookeeper.Group.JoinException; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; - -/** - * A server set that represents a fixed set of hosts. - * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is - * present. - * A static server set does not support joining, but will allow normal join calls and status update - * calls to be made. - */ -public class StaticServerSet implements ServerSet { - - private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName()); - - private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE = - new Function<Endpoint, ServiceInstance>() { - @Override public ServiceInstance apply(Endpoint endpoint) { - return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE); - } - }; - - private final ImmutableSet<ServiceInstance> hosts; - - /** - * Creates a static server set that will reply to monitor calls immediately and exactly once with - * the provided service instances. - * - * @param hosts Hosts in the static set. - */ - public StaticServerSet(Set<ServiceInstance> hosts) { - this.hosts = ImmutableSet.copyOf(hosts); - } - - /** - * Creates a static server set containing the provided endpoints (and no auxiliary ports) which - * will all be in the {@link Status#ALIVE} state. - * - * @param endpoints Endpoints in the static set. - * @return A static server set that will advertise the provided endpoints. - */ - public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) { - return new StaticServerSet( - ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE))); - } - - private EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints, - Optional<Integer> shardId) { - - LOG.warning("Attempt to join fixed server set ignored."); - ServiceInstance joining = new ServiceInstance( - ServerSets.toEndpoint(endpoint), - Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT), - Status.ALIVE); - if (shardId.isPresent()) { - joining.setShard(shardId.get()); - } - if (!hosts.contains(joining)) { - LOG.log(Level.SEVERE, - "Joining instance " + joining + " does not match any member of the static set."); - } - - return new EndpointStatus() { - @Override public void leave() throws UpdateException { - LOG.warning("Attempt to adjust state of fixed server set ignored."); - } - - @Override public void update(Status status) throws UpdateException { - LOG.warning("Attempt to adjust state of fixed server set ignored."); - } - }; - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints, - Status status) { - - LOG.warning("This method is deprecated. Please do not specify a status field."); - return join(endpoint, auxEndpoints, Optional.<Integer>absent()); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints) { - - LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break."); - return join(endpoint, auxEndpoints, Optional.<Integer>absent()); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, - Map<String, InetSocketAddress> auxEndpoints, - int shardId) throws JoinException, InterruptedException { - - return join(endpoint, auxEndpoints, Optional.of(shardId)); - } - - @Override - public Command watch(HostChangeMonitor<ServiceInstance> monitor) { - monitor.onChange(hosts); - return Commands.NOOP; - } - - @Override - public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - watch(monitor); - } -}