http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java new file mode 100644 index 0000000..99a15fe --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java @@ -0,0 +1,708 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.util.List; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +import javax.annotation.Nullable; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.data.ACL; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.Commands; +import org.apache.aurora.common.base.ExceptionalSupplier; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.util.BackoffHelper; +import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; + +/** + * This class exposes methods for joining and monitoring distributed groups. The groups this class + * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for + * each member of a group. + */ +public class Group { + private static final Logger LOG = Logger.getLogger(Group.class.getName()); + + private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null); + private static final String DEFAULT_NODE_NAME_PREFIX = "member_"; + + private final ZooKeeperClient zkClient; + private final ImmutableList<ACL> acl; + private final String path; + + private final NodeScheme nodeScheme; + private final Predicate<String> nodeNameFilter; + + private final BackoffHelper backoffHelper; + + /** + * Creates a group rooted at the given {@code path}. Paths must be absolute and trailing or + * duplicate slashes will be normalized. For example, all the following paths would create a + * group at the normalized path /my/distributed/group: + * <ul> + * <li>/my/distributed/group + * <li>/my/distributed/group/ + * <li>/my/distributed//group + * </ul> + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param acl the ACL to use for creating the persistent group path if it does not already exist + * @param path the absolute persistent path that represents this group + * @param nodeScheme the scheme that defines how nodes are created + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) { + this.zkClient = Preconditions.checkNotNull(zkClient); + this.acl = ImmutableList.copyOf(acl); + this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path)); + + this.nodeScheme = Preconditions.checkNotNull(nodeScheme); + nodeNameFilter = new Predicate<String>() { + @Override public boolean apply(String nodeName) { + return Group.this.nodeScheme.isMember(nodeName); + } + }; + + backoffHelper = new BackoffHelper(); + } + + /** + * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a + * {@code namePrefix} of 'member_'. + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { + this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX); + } + + /** + * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a + * {@link DefaultScheme} using {@code namePrefix}. + */ + public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) { + this(zkClient, acl, path, new DefaultScheme(namePrefix)); + } + + public String getMemberPath(String memberId) { + return path + "/" + MorePreconditions.checkNotBlank(memberId); + } + + public String getPath() { + return path; + } + + public String getMemberId(String nodePath) { + MorePreconditions.checkNotBlank(nodePath); + Preconditions.checkArgument(nodePath.startsWith(path + "/"), + "Not a member of this group[%s]: %s", path, nodePath); + + String memberId = StringUtils.substringAfterLast(nodePath, "/"); + Preconditions.checkArgument(nodeScheme.isMember(memberId), + "Not a group member: %s", memberId); + return memberId; + } + + /** + * Returns the current list of group member ids by querying ZooKeeper synchronously. + * + * @return the ids of all the present members of this group + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading this group's member ids + * @throws InterruptedException if this thread is interrupted listing the group members + */ + public Iterable<String> getMemberIds() + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter); + } + + /** + * Gets the data for one of this groups members by querying ZooKeeper synchronously. + * + * @param memberId the id of the member whose data to retrieve + * @return the data associated with the {@code memberId} + * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper + * @throws KeeperException if there was a problem reading this member's data + * @throws InterruptedException if this thread is interrupted retrieving the member data + */ + public byte[] getMemberData(String memberId) + throws ZooKeeperConnectionException, KeeperException, InterruptedException { + return zkClient.get().getData(getMemberPath(memberId), false, null); + } + + /** + * Represents membership in a distributed group. + */ + public interface Membership { + + /** + * Returns the persistent ZooKeeper path that represents this group. + */ + String getGroupPath(); + + /** + * Returns the id (ZooKeeper node name) of this group member. May change over time if the + * ZooKeeper session expires. + */ + String getMemberId(); + + /** + * Returns the full ZooKeeper path to this group member. May change over time if the + * ZooKeeper session expires. + */ + String getMemberPath(); + + /** + * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to + * {@link Group#join()}. + * + * @return the new membership data + * @throws UpdateException if there was a problem updating the membership data + */ + byte[] updateMemberData() throws UpdateException; + + /** + * Cancels group membership by deleting the associated ZooKeeper member node. + * + * @throws JoinException if there is a problem deleting the node + */ + void cancel() throws JoinException; + } + + /** + * Indicates an error joining a group. + */ + public static class JoinException extends Exception { + public JoinException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Indicates an error updating a group member's data. + */ + public static class UpdateException extends Exception { + public UpdateException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Equivalent to calling {@code join(null, null)}. + */ + public final Membership join() throws JoinException, InterruptedException { + return join(NO_MEMBER_DATA, null); + } + + /** + * Equivalent to calling {@code join(memberData, null)}. + */ + public final Membership join(Supplier<byte[]> memberData) + throws JoinException, InterruptedException { + + return join(memberData, null); + } + + /** + * Equivalent to calling {@code join(null, onLoseMembership)}. + */ + public final Membership join(@Nullable final Command onLoseMembership) + throws JoinException, InterruptedException { + + return join(NO_MEMBER_DATA, onLoseMembership); + } + + /** + * Joins this group and returns the resulting Membership when successful. Membership will be + * automatically cancelled when the current jvm process dies; however the returned Membership + * object can be used to cancel membership earlier. Unless + * {@link Group.Membership#cancel()} is called the membership will + * be maintained by re-establishing it silently in the background. + * + * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper. If an + * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses + * membership in the group. + * + * @param memberData a supplier of the data to store in the member node + * @param onLoseMembership a callback to notify when membership is lost + * @return a Membership object with the member details + * @throws JoinException if there was a problem joining the group + * @throws InterruptedException if this thread is interrupted awaiting completion of the join + */ + public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) + throws JoinException, InterruptedException { + + Preconditions.checkNotNull(memberData); + ensurePersistentGroupPath(); + + final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership); + return backoffHelper.doUntilResult(new ExceptionalSupplier<Membership, JoinException>() { + @Override public Membership get() throws JoinException { + try { + return groupJoiner.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to join group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e); + return null; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error trying to join group at path: " + path, e); + return null; + } else { + throw new JoinException("Problem joining partition group at path: " + path, e); + } + } + } + }); + } + + private void ensurePersistentGroupPath() throws JoinException, InterruptedException { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() { + @Override public Boolean get() throws JoinException { + try { + ZooKeeperUtils.ensurePath(zkClient, acl, path); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to ensure group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error ensuring path: " + path, e); + return false; + } else { + throw new JoinException("Problem ensuring group at path: " + path, e); + } + } + } + }); + } + + private class ActiveMembership implements Membership { + private final Supplier<byte[]> memberData; + private final Command onLoseMembership; + private String nodePath; + private String memberId; + private volatile boolean cancelled; + private byte[] membershipData; + + public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) { + this.memberData = memberData; + this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership; + } + + @Override + public String getGroupPath() { + return path; + } + + @Override + public synchronized String getMemberId() { + return memberId; + } + + @Override + public synchronized String getMemberPath() { + return nodePath; + } + + @Override + public synchronized byte[] updateMemberData() throws UpdateException { + byte[] membershipData = memberData.get(); + if (!ArrayUtils.isEquals(this.membershipData, membershipData)) { + try { + zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION); + this.membershipData = membershipData; + } catch (KeeperException e) { + throw new UpdateException("Problem updating membership data.", e); + } catch (InterruptedException e) { + throw new UpdateException("Interrupted attempting to update membership data.", e); + } catch (ZooKeeperConnectionException e) { + throw new UpdateException( + "Could not connect to the ZooKeeper cluster to update membership data.", e); + } + } + return membershipData; + } + + @Override + public synchronized void cancel() throws JoinException { + if (!cancelled) { + try { + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, JoinException>() { + @Override public Boolean get() throws JoinException { + try { + zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (NoNodeException e) { + LOG.info("Membership already cancelled, node at path: " + nodePath + + " has been deleted"); + return true; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error cancelling membership: " + nodePath, e); + return false; + } else { + throw new JoinException("Problem cancelling membership: " + nodePath, e); + } + } + } + }); + cancelled = true; // Prevent auto-re-join logic from undoing this cancel. + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JoinException("Problem cancelling membership: " + nodePath, e); + } + } + } + + private class CancelledException extends IllegalStateException { /* marker */ } + + synchronized Membership join() + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + + if (cancelled) { + throw new CancelledException(); + } + + if (nodePath == null) { + // Re-join if our ephemeral node goes away due to session expiry - only needs to be + // registered once. + zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + tryJoin(); + } + }); + } + + byte[] membershipData = memberData.get(); + String nodeName = nodeScheme.createName(membershipData); + CreateMode createMode = nodeScheme.isSequential() + ? CreateMode.EPHEMERAL_SEQUENTIAL + : CreateMode.EPHEMERAL; + nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode); + memberId = Group.this.getMemberId(nodePath); + LOG.info("Set group member ID to " + memberId); + this.membershipData = membershipData; + + // Re-join if our ephemeral node goes away due to maliciousness. + zkClient.get().exists(nodePath, new Watcher() { + @Override public void process(WatchedEvent event) { + if (event.getType() == EventType.NodeDeleted) { + tryJoin(); + } + } + }); + + return this; + } + + private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin = + new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + join(); + return true; + } catch (CancelledException e) { + // Lost a cancel race - that's ok. + return true; + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error re-joining group: " + path, e); + return false; + } else { + throw new IllegalStateException("Permanent problem re-joining group: " + path, e); + } + } + } + }; + + private synchronized void tryJoin() { + onLoseMembership.execute(); + try { + backoffHelper.doUntilSuccess(tryJoin); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("Interrupted while trying to re-join group: %s, giving up", path), e); + } + } + } + + /** + * An interface to an object that listens for changes to a group's membership. + */ + public interface GroupChangeListener { + + /** + * Called whenever group membership changes with the new list of member ids. + * + * @param memberIds the current member ids + */ + void onGroupChange(Iterable<String> memberIds); + } + + /** + * An interface that dictates the scheme to use for storing and filtering nodes that represent + * members of a distributed group. + */ + public interface NodeScheme { + /** + * Determines if a child node is a member of a group by examining the node's name. + * + * @param nodeName the name of a child node found in a group + * @return {@code true} if {@code nodeName} identifies a group member in this scheme + */ + boolean isMember(String nodeName); + + /** + * Generates a node name for the node representing this process in the distributed group. + * + * @param membershipData the data that will be stored in this node + * @return the name for the node that will represent this process in the group + */ + String createName(byte[] membershipData); + + /** + * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes. + * + * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise + */ + boolean isSequential(); + } + + /** + * Indicates an error watching a group. + */ + public static class WatchException extends Exception { + public WatchException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Watches this group for the lifetime of this jvm process. This method will block until the + * current group members are available, notify the {@code groupChangeListener} and then return. + * All further changes to the group membership will cause notifications on a background thread. + * + * @param groupChangeListener the listener to notify of group membership change events + * @return A command which, when executed, will stop watching the group. + * @throws WatchException if there is a problem generating the 1st group membership list + * @throws InterruptedException if interrupted waiting to gather the 1st group membership list + */ + public final Command watch(final GroupChangeListener groupChangeListener) + throws WatchException, InterruptedException { + Preconditions.checkNotNull(groupChangeListener); + + try { + ensurePersistentGroupPath(); + } catch (JoinException e) { + throw new WatchException("Failed to create group path: " + path, e); + } + + final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener); + backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, WatchException>() { + @Override public Boolean get() throws WatchException { + try { + groupMonitor.watchGroup(); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WatchException("Interrupted trying to watch group at path: " + path, e); + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e); + return null; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error trying to watch group at path: " + path, e); + return null; + } else { + throw new WatchException("Problem trying to watch group at path: " + path, e); + } + } + } + }); + return new Command() { + @Override public void execute() { + groupMonitor.stopWatching(); + } + }; + } + + /** + * Helps continuously monitor a group for membership changes. + */ + private class GroupMonitor { + private final GroupChangeListener groupChangeListener; + private volatile boolean stopped = false; + private Set<String> members; + + GroupMonitor(GroupChangeListener groupChangeListener) { + this.groupChangeListener = groupChangeListener; + } + + private final Watcher groupWatcher = new Watcher() { + @Override public final void process(WatchedEvent event) { + if (event.getType() == EventType.NodeChildrenChanged) { + tryWatchGroup(); + } + } + }; + + private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup = + new ExceptionalSupplier<Boolean, InterruptedException>() { + @Override public Boolean get() throws InterruptedException { + try { + watchGroup(); + return true; + } catch (ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, "Problem connecting to ZooKeeper, retrying", e); + return false; + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, "Temporary error re-watching group: " + path, e); + return false; + } else { + throw new IllegalStateException("Permanent problem re-watching group: " + path, e); + } + } + } + }; + + private void tryWatchGroup() { + if (stopped) { + return; + } + + try { + backoffHelper.doUntilSuccess(tryWatchGroup); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + String.format("Interrupted while trying to re-watch group: %s, giving up", path), e); + } + } + + private void watchGroup() + throws ZooKeeperConnectionException, InterruptedException, KeeperException { + + if (stopped) { + return; + } + + List<String> children = zkClient.get().getChildren(path, groupWatcher); + setMembers(Iterables.filter(children, nodeNameFilter)); + } + + private void stopWatching() { + // TODO(William Farner): Cancel the watch when + // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved. + LOG.info("Stopping watch on " + this); + stopped = true; + } + + synchronized void setMembers(Iterable<String> members) { + if (stopped) { + LOG.info("Suppressing membership update, no longer watching " + this); + return; + } + + if (this.members == null) { + // Reset our watch on the group if session expires - only needs to be registered once. + zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + tryWatchGroup(); + } + }); + } + + Set<String> membership = ImmutableSet.copyOf(members); + if (!membership.equals(this.members)) { + groupChangeListener.onGroupChange(members); + this.members = membership; + } + } + } + + /** + * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] + + * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the + * prefix is "member_", the node's full path will look something like + * {@code /discovery/servicename/member_0000000007}. + */ + public static class DefaultScheme implements NodeScheme { + private final String namePrefix; + private final Pattern namePattern; + + /** + * Creates a sequential node scheme based on the given node name prefix. + * + * @param namePrefix the prefix for the names of the member nodes + */ + public DefaultScheme(String namePrefix) { + this.namePrefix = MorePreconditions.checkNotBlank(namePrefix); + namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$"); + } + + @Override + public boolean isMember(String nodeName) { + return namePattern.matcher(nodeName).matches(); + } + + @Override + public String createName(byte[] membershipData) { + return namePrefix; + } + + @Override + public boolean isSequential() { + return true; + } + } + + @Override + public String toString() { + return "Group " + path; + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java new file mode 100644 index 0000000..91ea345 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java @@ -0,0 +1,172 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.Ordering; +import org.apache.aurora.common.zookeeper.Group.GroupChangeListener; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.zookeeper.Group.Membership; +import org.apache.aurora.common.zookeeper.Group.UpdateException; +import org.apache.aurora.common.zookeeper.Group.WatchException; +import org.apache.zookeeper.data.ACL; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.logging.Logger; + +/** + * A distributed mechanism for eventually arriving at an evenly partitioned space of long values. + * A typical usage would have a client on each of several hosts joining a logical partition (a + * "partition group") that represents some shared work. Clients could then process a subset of a + * full body of work by testing any given item of work with their partition filter. + * + * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1 + * partition as explained in {@link #join()}. + * + * @author John Sirois + */ +public class Partitioner { + + private static final Logger LOG = Logger.getLogger(Partitioner.class.getName()); + + private volatile int groupSize; + private volatile int groupIndex; + private final Group group; + + /** + * Constructs a representation of a partition group but does not join it. Note that the partition + * group path will be created as a persistent zookeeper path if it does not already exist. + * + * @param zkClient a client to use for joining the partition group and watching its membership + * @param acl the acl for this partition group + * @param path a zookeeper path that represents the partition group + */ + public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) { + group = new Group(zkClient, acl, path); + } + + @VisibleForTesting + int getGroupSize() { + return groupSize; + } + + /** + * Represents a slice of a partition group. The partition is dynamic and will adjust its size as + * members join and leave its partition group. + */ + public abstract static class Partition implements Predicate<Long>, Membership { + + /** + * Returns {@code true} if the given {@code value} is a member of this partition at this time. + */ + public abstract boolean isMember(long value); + + /** + * Gets number of members in the group at this time. + * + * @return number of members in the ZK group at this time. + */ + public abstract int getNumPartitions(); + + /** + * Evaluates partition membership based on the given {@code value}'s hash code. If the value + * is null it is never a member of a partition. + */ + boolean isMember(Object value) { + return (value != null) && isMember(value.hashCode()); + } + + /** + * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing + * overhead. + */ + @Override + public boolean apply(@Nullable Long input) { + return (input != null) && isMember(input); + } + } + + /** + * Attempts to join the partition group and claim a slice. When successful, a predicate is + * returned that can be used to test whether or not an item belongs to this partition. The + * predicate is dynamic such that as the group is further partitioned or partitions merge the + * predicate will claim a narrower or wider swath of the partition space respectively. Partition + * creation and merging is not instantaneous and clients should expect independent partitions to + * claim ownership of some items when partition membership is in flux. It is only in the steady + * state that a client should expect independent partitions to divide the partition space evenly + * and without overlap. + * + * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation. + * + * @return the partition representing the slice of the partition group this member can claim + * @throws JoinException if there was a problem joining the partition group + * @throws InterruptedException if interrupted while waiting to join the partition group + */ + public final Partition join() throws JoinException, InterruptedException { + final Membership membership = group.join(); + try { + group.watch(createGroupChangeListener(membership)); + } catch (WatchException e) { + membership.cancel(); + throw new JoinException("Problem establishing watch on group after joining it", e); + } + return new Partition() { + @Override public boolean isMember(long value) { + return (value % groupSize) == groupIndex; + } + + @Override public int getNumPartitions() { + return groupSize; + } + + @Override public String getGroupPath() { + return membership.getGroupPath(); + } + + @Override public String getMemberId() { + return membership.getMemberId(); + } + + @Override public String getMemberPath() { + return membership.getMemberPath(); + } + + @Override public byte[] updateMemberData() throws UpdateException { + return membership.updateMemberData(); + } + + @Override public void cancel() throws JoinException { + membership.cancel(); + } + }; + } + + @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) { + return new GroupChangeListener() { + @Override public void onGroupChange(Iterable<String> memberIds) { + List<String> members = Ordering.natural().sortedCopy(memberIds); + int newSize = members.size(); + int newIndex = members.indexOf(membership.getMemberId()); + + LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]", + membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex)); + + groupSize = newSize; + groupIndex = newIndex; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java new file mode 100644 index 0000000..fb578e1 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java @@ -0,0 +1,114 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import org.apache.aurora.common.net.pool.DynamicHostSet; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; + +import java.net.InetSocketAddress; +import java.util.Map; + +/** + * A logical set of servers registered in ZooKeeper. Intended to be used by both servers in a + * common service and their clients. + * + * TODO(William Farner): Explore decoupling this from thrift. + */ +public interface ServerSet extends DynamicHostSet<ServiceInstance> { + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @param status the current service status + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + * @deprecated The status field is deprecated. Please use {@link #join(InetSocketAddress, Map)} + */ + @Deprecated + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Status status) throws JoinException, InterruptedException; + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws JoinException, InterruptedException; + + /** + * Attempts to join a server set for this logical service group. + * + * @param endpoint the primary service endpoint + * @param additionalEndpoints and additional endpoints keyed by their logical name + * @param shardId Unique shard identifier for this member of the service. + * @return an EndpointStatus object that allows the endpoint to adjust its status + * @throws JoinException if there was a problem joining the server set + * @throws InterruptedException if interrupted while waiting to join the server set + */ + EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + int shardId) throws JoinException, InterruptedException; + + /** + * A handle to a service endpoint's status data that allows updating it to track current events. + */ + public interface EndpointStatus { + + /** + * Attempts to update the status of the service endpoint associated with this endpoint. If the + * {@code status} is {@link Status#DEAD} then the endpoint will be removed from the server set. + * + * @param status the current status of the endpoint + * @throws UpdateException if there was a problem writing the update + * @deprecated Support for mutable status is deprecated. Please use {@link #leave()} + */ + @Deprecated + void update(Status status) throws UpdateException; + + /** + * Removes the endpoint from the server set. + * + * @throws UpdateException if there was a problem leaving the ServerSet. + */ + void leave() throws UpdateException; + } + + /** + * Indicates an error updating a service's status information. + */ + public static class UpdateException extends Exception { + public UpdateException(String message, Throwable cause) { + super(message, cause); + } + + public UpdateException(String message) { + super(message); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java new file mode 100644 index 0000000..9a33d3e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java @@ -0,0 +1,602 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.Sets.SetView; +import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.gson.Gson; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import org.apache.aurora.common.args.Arg; +import org.apache.aurora.common.args.CmdLine; +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.Function; +import org.apache.aurora.common.base.Supplier; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.io.CompatibilityCodec; +import org.apache.aurora.common.io.ThriftCodec; +import org.apache.aurora.common.util.BackoffHelper; + +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * ZooKeeper-backed implementation of {@link ServerSet}. + */ +public class ServerSetImpl implements ServerSet { + private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName()); + + @CmdLine(name = "serverset_encode_json", + help = "If true, use JSON for encoding server set information." + + " Defaults to true (use JSON).") + private static final Arg<Boolean> ENCODE_JSON = Arg.create(true); + + private final ZooKeeperClient zkClient; + private final Group group; + private final Codec<ServiceInstance> codec; + private final BackoffHelper backoffHelper; + + /** + * Creates a new ServerSet using open ZooKeeper node ACLs. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param path the name-service path of the service to connect to + */ + public ServerSetImpl(ZooKeeperClient zkClient, String path) { + this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path); + } + + /** + * Creates a new ServerSet for the given service {@code path}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param acl the ACL to use for creating the persistent group path if it does not already exist + * @param path the name-service path of the service to connect to + */ + public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) { + this(zkClient, new Group(zkClient, acl, path), createDefaultCodec()); + } + + /** + * Creates a new ServerSet using the given service {@code group}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param group the server group + */ + public ServerSetImpl(ZooKeeperClient zkClient, Group group) { + this(zkClient, group, createDefaultCodec()); + } + + /** + * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}. + * + * @param zkClient the client to use for interactions with ZooKeeper + * @param group the server group + * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and + * from a byte array + */ + public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) { + this.zkClient = checkNotNull(zkClient); + this.group = checkNotNull(group); + this.codec = checkNotNull(codec); + + // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable. + backoffHelper = new BackoffHelper(); + } + + @VisibleForTesting + ZooKeeperClient getZkClient() { + return zkClient; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints) + throws Group.JoinException, InterruptedException { + + LOG.log(Level.WARNING, + "Joining a ServerSet without a shard ID is deprecated and will soon break."); + return join(endpoint, additionalEndpoints, Optional.<Integer>absent()); + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + int shardId) throws Group.JoinException, InterruptedException { + + return join(endpoint, additionalEndpoints, Optional.of(shardId)); + } + + private EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Optional<Integer> shardId) throws Group.JoinException, InterruptedException { + + checkNotNull(endpoint); + checkNotNull(additionalEndpoints); + + final MemberStatus memberStatus = + new MemberStatus(endpoint, additionalEndpoints, shardId); + Supplier<byte[]> serviceInstanceSupplier = new Supplier<byte[]>() { + @Override public byte[] get() { + return memberStatus.serializeServiceInstance(); + } + }; + final Group.Membership membership = group.join(serviceInstanceSupplier); + + return new EndpointStatus() { + @Override public void update(Status status) throws UpdateException { + checkNotNull(status); + LOG.warning("This method is deprecated. Please use leave() instead."); + if (status == Status.DEAD) { + leave(); + } else { + LOG.warning("Status update has been ignored"); + } + } + + @Override public void leave() throws UpdateException { + memberStatus.leave(membership); + } + }; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Status status) throws Group.JoinException, InterruptedException { + + LOG.warning("This method is deprecated. Please do not specify a status field."); + if (status != Status.ALIVE) { + LOG.severe("**************************************************************************\n" + + "WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED.\n" + + "JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status + + "\n**************************************************************************"); + } + return join(endpoint, additionalEndpoints); + } + + @Override + public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor); + try { + return serverSetWatcher.watch(); + } catch (Group.WatchException e) { + throw new MonitorException("ZooKeeper watch failed.", e); + } catch (InterruptedException e) { + throw new MonitorException("Interrupted while watching ZooKeeper.", e); + } + } + + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + LOG.warning("This method is deprecated. Please use watch instead."); + watch(monitor); + } + + private class MemberStatus { + private final InetSocketAddress endpoint; + private final Map<String, InetSocketAddress> additionalEndpoints; + private final Optional<Integer> shardId; + + private MemberStatus( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> additionalEndpoints, + Optional<Integer> shardId) { + + this.endpoint = endpoint; + this.additionalEndpoints = additionalEndpoints; + this.shardId = shardId; + } + + synchronized void leave(Group.Membership membership) throws UpdateException { + try { + membership.cancel(); + } catch (Group.JoinException e) { + throw new UpdateException( + "Failed to auto-cancel group membership on transition to DEAD status", e); + } + } + + byte[] serializeServiceInstance() { + ServiceInstance serviceInstance = new ServiceInstance( + ServerSets.toEndpoint(endpoint), + Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT), + Status.ALIVE); + + if (shardId.isPresent()) { + serviceInstance.setShard(shardId.get()); + } + + LOG.fine("updating endpoint data to:\n\t" + serviceInstance); + try { + return ServerSets.serializeServiceInstance(serviceInstance, codec); + } catch (IOException e) { + throw new IllegalStateException("Unexpected problem serializing thrift struct " + + serviceInstance + "to a byte[]", e); + } + } + } + + private static class ServiceInstanceFetchException extends RuntimeException { + ServiceInstanceFetchException(String message, Throwable cause) { + super(message, cause); + } + } + + private static class ServiceInstanceDeletedException extends RuntimeException { + ServiceInstanceDeletedException(String path) { + super(path); + } + } + + private class ServerSetWatcher { + private final ZooKeeperClient zkClient; + private final HostChangeMonitor<ServiceInstance> monitor; + @Nullable private ImmutableSet<ServiceInstance> serverSet; + + ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) { + this.zkClient = zkClient; + this.monitor = monitor; + } + + public Command watch() throws Group.WatchException, InterruptedException { + Watcher onExpirationWatcher = zkClient.registerExpirationHandler(new Command() { + @Override public void execute() { + // Servers may have changed Status while we were disconnected from ZooKeeper, check and + // re-register our node watches. + rebuildServerSet(); + } + }); + + try { + return group.watch(new Group.GroupChangeListener() { + @Override public void onGroupChange(Iterable<String> memberIds) { + notifyGroupChange(memberIds); + } + }); + } catch (Group.WatchException e) { + zkClient.unregister(onExpirationWatcher); + throw e; + } catch (InterruptedException e) { + zkClient.unregister(onExpirationWatcher); + throw e; + } + } + + private ServiceInstance getServiceInstance(final String nodePath) { + try { + return backoffHelper.doUntilResult(new Supplier<ServiceInstance>() { + @Override public ServiceInstance get() { + try { + byte[] data = zkClient.get().getData(nodePath, false, null); + return ServerSets.deserializeServiceInstance(data, codec); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceInstanceFetchException( + "Interrupted updating service data for: " + nodePath, e); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + LOG.log(Level.WARNING, + "Temporary error trying to updating service data for: " + nodePath, e); + return null; + } catch (NoNodeException e) { + invalidateNodePath(nodePath); + throw new ServiceInstanceDeletedException(nodePath); + } catch (KeeperException e) { + if (zkClient.shouldRetry(e)) { + LOG.log(Level.WARNING, + "Temporary error trying to update service data for: " + nodePath, e); + return null; + } else { + throw new ServiceInstanceFetchException( + "Failed to update service data for: " + nodePath, e); + } + } catch (IOException e) { + throw new ServiceInstanceFetchException( + "Failed to deserialize the ServiceInstance data for: " + nodePath, e); + } + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceInstanceFetchException( + "Interrupted trying to update service data for: " + nodePath, e); + } + } + + private final LoadingCache<String, ServiceInstance> servicesByMemberId = + CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() { + @Override public ServiceInstance load(String memberId) { + return getServiceInstance(group.getMemberPath(memberId)); + } + }); + + private void rebuildServerSet() { + Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet()); + servicesByMemberId.invalidateAll(); + notifyGroupChange(memberIds); + } + + private String invalidateNodePath(String deletedPath) { + String memberId = group.getMemberId(deletedPath); + servicesByMemberId.invalidate(memberId); + return memberId; + } + + private final Function<String, ServiceInstance> MAYBE_FETCH_NODE = + new Function<String, ServiceInstance>() { + @Override public ServiceInstance apply(String memberId) { + // This get will trigger a fetch + try { + return servicesByMemberId.getUnchecked(memberId); + } catch (UncheckedExecutionException e) { + Throwable cause = e.getCause(); + if (!(cause instanceof ServiceInstanceDeletedException)) { + Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class); + throw new IllegalStateException( + "Unexpected error fetching member data for: " + memberId, e); + } + return null; + } + } + }; + + private synchronized void notifyGroupChange(Iterable<String> memberIds) { + ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds); + Set<String> existingMemberIds = servicesByMemberId.asMap().keySet(); + + // Ignore no-op state changes except for the 1st when we've seen no group yet. + if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) { + SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds); + // Implicit removal from servicesByMemberId. + existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds)); + + Iterable<ServiceInstance> serviceInstances = Iterables.filter( + Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull()); + + notifyServerSetChange(ImmutableSet.copyOf(serviceInstances)); + } + } + + private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) { + // ZK nodes may have changed if there was a session expiry for a server in the server set, but + // if the server's status has not changed, we can skip any onChange updates. + if (!currentServerSet.equals(serverSet)) { + if (currentServerSet.isEmpty()) { + LOG.warning("server set empty for path " + group.getPath()); + } else { + if (LOG.isLoggable(Level.INFO)) { + if (serverSet == null) { + LOG.info("received initial membership " + currentServerSet); + } else { + logChange(Level.INFO, currentServerSet); + } + } + } + serverSet = currentServerSet; + monitor.onChange(serverSet); + } + } + + private void logChange(Level level, ImmutableSet<ServiceInstance> newServerSet) { + StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: "); + if (serverSet.size() != newServerSet.size()) { + message.append("from ").append(serverSet.size()) + .append(" members to ").append(newServerSet.size()); + } + + Joiner joiner = Joiner.on("\n\t\t"); + + SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet); + if (!left.isEmpty()) { + message.append("\n\tleft:\n\t\t").append(joiner.join(left)); + } + + SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet); + if (!joined.isEmpty()) { + message.append("\n\tjoined:\n\t\t").append(joiner.join(joined)); + } + + LOG.log(level, message.toString()); + } + } + + private static class EndpointSchema { + final String host; + final Integer port; + + EndpointSchema(Endpoint endpoint) { + Preconditions.checkNotNull(endpoint); + this.host = endpoint.getHost(); + this.port = endpoint.getPort(); + } + + String getHost() { + return host; + } + + Integer getPort() { + return port; + } + } + + private static class ServiceInstanceSchema { + final EndpointSchema serviceEndpoint; + final Map<String, EndpointSchema> additionalEndpoints; + final Status status; + final Integer shard; + + ServiceInstanceSchema(ServiceInstance instance) { + this.serviceEndpoint = new EndpointSchema(instance.getServiceEndpoint()); + if (instance.getAdditionalEndpoints() != null) { + this.additionalEndpoints = Maps.transformValues( + instance.getAdditionalEndpoints(), + new Function<Endpoint, EndpointSchema>() { + @Override public EndpointSchema apply(Endpoint endpoint) { + return new EndpointSchema(endpoint); + } + } + ); + } else { + this.additionalEndpoints = Maps.newHashMap(); + } + this.status = instance.getStatus(); + this.shard = instance.isSetShard() ? instance.getShard() : null; + } + + EndpointSchema getServiceEndpoint() { + return serviceEndpoint; + } + + Map<String, EndpointSchema> getAdditionalEndpoints() { + return additionalEndpoints; + } + + Status getStatus() { + return status; + } + + Integer getShard() { + return shard; + } + } + + /** + * An adapted JSON codec that makes use of {@link ServiceInstanceSchema} to circumvent the + * __isset_bit_vector internal thrift struct field that tracks primitive types. + */ + private static class AdaptedJsonCodec implements Codec<ServiceInstance> { + private static final Charset ENCODING = Charsets.UTF_8; + private static final Class<ServiceInstanceSchema> CLASS = ServiceInstanceSchema.class; + private final Gson gson = new Gson(); + + @Override + public void serialize(ServiceInstance instance, OutputStream sink) throws IOException { + Writer w = new OutputStreamWriter(sink, ENCODING); + gson.toJson(new ServiceInstanceSchema(instance), CLASS, w); + w.flush(); + } + + @Override + public ServiceInstance deserialize(InputStream source) throws IOException { + ServiceInstanceSchema output = gson.fromJson(new InputStreamReader(source, ENCODING), CLASS); + Endpoint primary = new Endpoint( + output.getServiceEndpoint().getHost(), output.getServiceEndpoint().getPort()); + Map<String, Endpoint> additional = Maps.transformValues( + output.getAdditionalEndpoints(), + new Function<EndpointSchema, Endpoint>() { + @Override public Endpoint apply(EndpointSchema endpoint) { + return new Endpoint(endpoint.getHost(), endpoint.getPort()); + } + } + ); + ServiceInstance instance = + new ServiceInstance(primary, ImmutableMap.copyOf(additional), output.getStatus()); + if (output.getShard() != null) { + instance.setShard(output.getShard()); + } + return instance; + } + } + + private static Codec<ServiceInstance> createCodec(final boolean useJsonEncoding) { + final Codec<ServiceInstance> json = new AdaptedJsonCodec(); + final Codec<ServiceInstance> thrift = + ThriftCodec.create(ServiceInstance.class, ThriftCodec.BINARY_PROTOCOL); + final Predicate<byte[]> recognizer = new Predicate<byte[]>() { + public boolean apply(byte[] input) { + return (input.length > 1 && input[0] == '{' && input[1] == '\"') == useJsonEncoding; + } + }; + + if (useJsonEncoding) { + return CompatibilityCodec.create(json, thrift, 2, recognizer); + } + return CompatibilityCodec.create(thrift, json, 2, recognizer); + } + + /** + * Creates a codec for {@link ServiceInstance} objects that uses Thrift binary encoding, and can + * decode both Thrift and JSON encodings. + * + * @return a new codec instance. + */ + public static Codec<ServiceInstance> createThriftCodec() { + return createCodec(false); + } + + /** + * Creates a codec for {@link ServiceInstance} objects that uses JSON encoding, and can decode + * both Thrift and JSON encodings. + * + * @return a new codec instance. + */ + public static Codec<ServiceInstance> createJsonCodec() { + return createCodec(true); + } + + /** + * Returns a codec for {@link ServiceInstance} objects that uses either the Thrift or the JSON + * encoding, depending on whether the command line argument <tt>serverset_json_encofing</tt> is + * set to <tt>true</tt>, and can decode both Thrift and JSON encodings. + * + * @return a new codec instance. + */ + public static Codec<ServiceInstance> createDefaultCodec() { + return createCodec(ENCODE_JSON.get()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java new file mode 100644 index 0000000..2b99268 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java @@ -0,0 +1,148 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.zookeeper.data.ACL; + +import org.apache.aurora.common.base.Function; +import org.apache.aurora.common.base.MorePreconditions; +import org.apache.aurora.common.io.Codec; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; + +/** + * Common ServerSet related functions + */ +public class ServerSets { + + private ServerSets() { + // Utility class. + } + + /** + * A function that invokes {@link #toEndpoint(InetSocketAddress)}. + */ + public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT = + new Function<InetSocketAddress, Endpoint>() { + @Override public Endpoint apply(InetSocketAddress address) { + return ServerSets.toEndpoint(address); + } + }; + + /** + * Creates a server set that registers at a single path applying the given ACL to all nodes + * created in the path. + * + * @param zkClient ZooKeeper client to register with. + * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. + * @param zkPath Path to register at. @see #create(ZooKeeperClient, java.util.Set) + * @return A server set that registers at {@code zkPath}. + */ + public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) { + return create(zkClient, acl, ImmutableSet.of(zkPath)); + } + + /** + * Creates a server set that registers at one or multiple paths applying the given ACL to all + * nodes created in the paths. + * + * @param zkClient ZooKeeper client to register with. + * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates. + * @param zkPaths Paths to register at, must be non-empty. + * @return A server set that registers at the given {@code zkPath}s. + */ + public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) { + Preconditions.checkNotNull(zkClient); + MorePreconditions.checkNotBlank(acl); + MorePreconditions.checkNotBlank(zkPaths); + + if (zkPaths.size() == 1) { + return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths)); + } else { + ImmutableList.Builder<ServerSet> builder = ImmutableList.builder(); + for (String path : zkPaths) { + builder.add(new ServerSetImpl(zkClient, acl, path)); + } + return new CompoundServerSet(builder.build()); + } + } + + /** + * Returns a serialized Thrift service instance object, with given endpoints and codec. + * + * @param serviceInstance the Thrift service instance object to be serialized + * @param codec the codec to use to serialize a Thrift service instance object + * @return byte array that contains a serialized Thrift service instance + */ + public static byte[] serializeServiceInstance( + ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException { + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + codec.serialize(serviceInstance, output); + return output.toByteArray(); + } + + /** + * Serializes a service instance based on endpoints. + * @see #serializeServiceInstance(ServiceInstance, Codec) + * + * @param address the target address of the service instance + * @param additionalEndpoints additional endpoints of the service instance + * @param status service status + */ + public static byte[] serializeServiceInstance( + InetSocketAddress address, + Map<String, Endpoint> additionalEndpoints, + Status status, + Codec<ServiceInstance> codec) throws IOException { + + ServiceInstance serviceInstance = + new ServiceInstance(toEndpoint(address), additionalEndpoints, status); + return serializeServiceInstance(serviceInstance, codec); + } + + /** + * Creates a service instance object deserialized from byte array. + * + * @param data the byte array contains a serialized Thrift service instance + * @param codec the codec to use to deserialize the byte array + */ + public static ServiceInstance deserializeServiceInstance( + byte[] data, Codec<ServiceInstance> codec) throws IOException { + + return codec.deserialize(new ByteArrayInputStream(data)); + } + + /** + * Creates an endpoint for the given InetSocketAddress. + * + * @param address the target address to create the endpoint for + */ + public static Endpoint toEndpoint(InetSocketAddress address) { + return new Endpoint(address.getHostName(), address.getPort()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java new file mode 100644 index 0000000..660f3d6 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonService.java @@ -0,0 +1,314 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import org.apache.aurora.common.base.ExceptionalCommand; +import org.apache.aurora.common.zookeeper.Candidate.Leader; +import org.apache.aurora.common.zookeeper.Group.JoinException; + +import org.apache.aurora.common.thrift.Status; + +/** + * A service that uses master election to only allow a single instance of the server to join + * the {@link ServerSet} at a time. + */ +public class SingletonService { + private static final Logger LOG = Logger.getLogger(SingletonService.class.getName()); + + @VisibleForTesting + static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_"; + + /** + * Creates a candidate that can be combined with an existing server set to form a singleton + * service using {@link #SingletonService(ServerSet, Candidate)}. + * + * @param zkClient The ZooKeeper client to use. + * @param servicePath The path where service nodes live. + * @param acl The acl to apply to newly created candidate nodes and serverset nodes. + * @return A candidate that can be housed with a standard server set under a single zk path. + */ + public static Candidate createSingletonCandidate( + ZooKeeperClient zkClient, + String servicePath, + Iterable<ACL> acl) { + + return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX)); + } + + private final ServerSet serverSet; + private final Candidate candidate; + + /** + * Equivalent to {@link #SingletonService(ZooKeeperClient, String, Iterable)} with a default + * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}). + */ + public SingletonService(ZooKeeperClient zkClient, String servicePath) { + this(zkClient, servicePath, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + /** + * Creates a new singleton service, identified by {@code servicePath}. All nodes related to the + * service (for both leader election and service registration) will live under the path and each + * node will be created with the supplied {@code acl}. Internally, two ZooKeeper {@code Group}s + * are used to manage a singleton service - one for leader election, and another for the + * {@code ServerSet} where the leader's endpoints are registered. Leadership election should + * guarantee that at most one instance will ever exist in the ServerSet at once. + * + * @param zkClient The ZooKeeper client to use. + * @param servicePath The path where service nodes live. + * @param acl The acl to apply to newly created candidate nodes and serverset nodes. + */ + public SingletonService(ZooKeeperClient zkClient, String servicePath, Iterable<ACL> acl) { + this( + new ServerSetImpl(zkClient, new Group(zkClient, acl, servicePath)), + createSingletonCandidate(zkClient, servicePath, acl)); + } + + /** + * Creates a new singleton service that uses the supplied candidate to vie for leadership and then + * advertises itself in the given server set once elected. + * + * @param serverSet The server set to advertise in on election. + * @param candidate The candidacy to use to vie for election. + */ + public SingletonService(ServerSet serverSet, Candidate candidate) { + this.serverSet = Preconditions.checkNotNull(serverSet); + this.candidate = Preconditions.checkNotNull(candidate); + } + + /** + * Attempts to lead the singleton service. + * + * @param endpoint The primary endpoint to register as a leader candidate in the service. + * @param additionalEndpoints Additional endpoints that are available on the host. + * @param status deprecated, will be ignored entirely + * @param listener Handler to call when the candidate is elected or defeated. + * @throws Group.WatchException If there was a problem watching the ZooKeeper group. + * @throws Group.JoinException If there was a problem joining the ZooKeeper group. + * @throws InterruptedException If the thread watching/joining the group was interrupted. + * @deprecated The status field is deprecated. Please use + * {@link #lead(InetSocketAddress, Map, LeadershipListener)} + */ + @Deprecated + public void lead(final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final Status status, + final LeadershipListener listener) + throws Group.WatchException, Group.JoinException, InterruptedException { + + if (status != Status.ALIVE) { + LOG.severe("******************************************************************************"); + LOG.severe("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED."); + LOG.severe("JOINING WITH STATUS ALIVE EVEN THOUGH YOU SPECIFIED " + status); + LOG.severe("******************************************************************************"); + } else { + LOG.warning("******************************************************************************"); + LOG.warning("WARNING: MUTABLE STATUS FIELDS ARE NO LONGER SUPPORTED."); + LOG.warning("Please use SingletonService.lead(InetSocketAddress, Map, LeadershipListener)"); + LOG.warning("******************************************************************************"); + } + + lead(endpoint, additionalEndpoints, listener); + } + + /** + * Attempts to lead the singleton service. + * + * @param endpoint The primary endpoint to register as a leader candidate in the service. + * @param additionalEndpoints Additional endpoints that are available on the host. + * @param listener Handler to call when the candidate is elected or defeated. + * @throws Group.WatchException If there was a problem watching the ZooKeeper group. + * @throws Group.JoinException If there was a problem joining the ZooKeeper group. + * @throws InterruptedException If the thread watching/joining the group was interrupted. + */ + public void lead(final InetSocketAddress endpoint, + final Map<String, InetSocketAddress> additionalEndpoints, + final LeadershipListener listener) + throws Group.WatchException, Group.JoinException, InterruptedException { + + Preconditions.checkNotNull(listener); + + candidate.offerLeadership(new Leader() { + private ServerSet.EndpointStatus endpointStatus = null; + @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) { + listener.onLeading(new LeaderControl() { + ServerSet.EndpointStatus endpointStatus = null; + final AtomicBoolean left = new AtomicBoolean(false); + + // Methods are synchronized to prevent simultaneous invocations. + @Override public synchronized void advertise() + throws JoinException, InterruptedException { + + Preconditions.checkState(!left.get(), "Cannot advertise after leaving."); + Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once."); + endpointStatus = serverSet.join(endpoint, additionalEndpoints); + } + + @Override public synchronized void leave() throws ServerSet.UpdateException, JoinException { + Preconditions.checkState(left.compareAndSet(false, true), + "Cannot leave more than once."); + if (endpointStatus != null) { + endpointStatus.leave(); + } + abdicate.execute(); + } + }); + } + + @Override public void onDefeated() { + listener.onDefeated(endpointStatus); + } + }); + } + + /** + * A listener to be notified of changes in the leadership status. + * Implementers should be careful to avoid blocking operations in these callbacks. + */ + public interface LeadershipListener { + + /** + * Notifies the listener that is is current leader. + * + * @param control A controller handle to advertise and/or leave advertised presence. + */ + public void onLeading(LeaderControl control); + + /** + * Notifies the listener that it is no longer leader. The leader should take this opportunity + * to remove its advertisement gracefully. + * + * @param status A handle on the endpoint status for the advertised leader. + */ + public void onDefeated(@Nullable ServerSet.EndpointStatus status); + } + + /** + * A leadership listener that decorates another listener by automatically defeating a + * leader that has dropped its connection to ZooKeeper. + * Note that the decision to use this over session-based mutual exclusion should not be taken + * lightly. Any momentary connection loss due to a flaky network or a ZooKeeper server process + * exit will cause a leader to abort. + */ + public static class DefeatOnDisconnectLeader implements LeadershipListener { + + private final LeadershipListener wrapped; + private Optional<LeaderControl> maybeControl = Optional.absent(); + + /** + * Creates a new leadership listener that will delegate calls to the wrapped listener, and + * invoke {@link #onDefeated(ServerSet.EndpointStatus)} if a ZooKeeper disconnect is observed while + * leading. + * + * @param zkClient The ZooKeeper client to watch for disconnect events. + * @param wrapped The leadership listener to wrap. + */ + public DefeatOnDisconnectLeader(ZooKeeperClient zkClient, LeadershipListener wrapped) { + this.wrapped = Preconditions.checkNotNull(wrapped); + + zkClient.register(new Watcher() { + @Override public void process(WatchedEvent event) { + if ((event.getType() == EventType.None) + && (event.getState() == KeeperState.Disconnected)) { + disconnected(); + } + } + }); + } + + private synchronized void disconnected() { + if (maybeControl.isPresent()) { + LOG.warning("Disconnected from ZooKeeper while leading, committing suicide."); + try { + wrapped.onDefeated(null); + maybeControl.get().leave(); + } catch (ServerSet.UpdateException e) { + LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e); + } catch (JoinException e) { + LOG.log(Level.WARNING, "Failed to leave singleton service: " + e, e); + } finally { + setControl(null); + } + } else { + LOG.info("Disconnected from ZooKeeper, but that's fine because I'm not the leader."); + } + } + + private synchronized void setControl(@Nullable LeaderControl control) { + this.maybeControl = Optional.fromNullable(control); + } + + @Override public void onLeading(final LeaderControl control) { + setControl(control); + wrapped.onLeading(new LeaderControl() { + @Override public void advertise() throws JoinException, InterruptedException { + control.advertise(); + } + + @Override public void leave() throws ServerSet.UpdateException, JoinException { + setControl(null); + control.leave(); + } + }); + } + + @Override public void onDefeated(@Nullable ServerSet.EndpointStatus status) { + setControl(null); + wrapped.onDefeated(status); + } + } + + /** + * A controller for the state of the leader. This will be provided to the leader upon election, + * which allows the leader to decide when to advertise in the underlying {@link ServerSet} and + * terminate leadership at will. + */ + public interface LeaderControl { + + /** + * Advertises the leader's server presence to clients. + * + * @throws JoinException If there was an error advertising. + * @throws InterruptedException If interrupted while advertising. + */ + void advertise() throws JoinException, InterruptedException; + + /** + * Leaves candidacy for leadership, removing advertised server presence if applicable. + * + * @throws ServerSet.UpdateException If the leader's status could not be updated. + * @throws JoinException If there was an error abdicating from leader election. + */ + void leave() throws ServerSet.UpdateException, JoinException; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java new file mode 100644 index 0000000..99c290e --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java @@ -0,0 +1,145 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.Commands; +import org.apache.aurora.common.zookeeper.Group.JoinException; +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; + +/** + * A server set that represents a fixed set of hosts. + * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is + * present. + * A static server set does not support joining, but will allow normal join calls and status update + * calls to be made. + */ +public class StaticServerSet implements ServerSet { + + private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName()); + + private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE = + new Function<Endpoint, ServiceInstance>() { + @Override public ServiceInstance apply(Endpoint endpoint) { + return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE); + } + }; + + private final ImmutableSet<ServiceInstance> hosts; + + /** + * Creates a static server set that will reply to monitor calls immediately and exactly once with + * the provided service instances. + * + * @param hosts Hosts in the static set. + */ + public StaticServerSet(Set<ServiceInstance> hosts) { + this.hosts = ImmutableSet.copyOf(hosts); + } + + /** + * Creates a static server set containing the provided endpoints (and no auxiliary ports) which + * will all be in the {@link Status#ALIVE} state. + * + * @param endpoints Endpoints in the static set. + * @return A static server set that will advertise the provided endpoints. + */ + public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) { + return new StaticServerSet( + ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE))); + } + + private EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints, + Optional<Integer> shardId) { + + LOG.warning("Attempt to join fixed server set ignored."); + ServiceInstance joining = new ServiceInstance( + ServerSets.toEndpoint(endpoint), + Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT), + Status.ALIVE); + if (shardId.isPresent()) { + joining.setShard(shardId.get()); + } + if (!hosts.contains(joining)) { + LOG.log(Level.SEVERE, + "Joining instance " + joining + " does not match any member of the static set."); + } + + return new EndpointStatus() { + @Override public void leave() throws UpdateException { + LOG.warning("Attempt to adjust state of fixed server set ignored."); + } + + @Override public void update(Status status) throws UpdateException { + LOG.warning("Attempt to adjust state of fixed server set ignored."); + } + }; + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints, + Status status) { + + LOG.warning("This method is deprecated. Please do not specify a status field."); + return join(endpoint, auxEndpoints, Optional.<Integer>absent()); + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints) { + + LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break."); + return join(endpoint, auxEndpoints, Optional.<Integer>absent()); + } + + @Override + public EndpointStatus join( + InetSocketAddress endpoint, + Map<String, InetSocketAddress> auxEndpoints, + int shardId) throws JoinException, InterruptedException { + + return join(endpoint, auxEndpoints, Optional.of(shardId)); + } + + @Override + public Command watch(HostChangeMonitor<ServiceInstance> monitor) { + monitor.onChange(hosts); + return Commands.NOOP; + } + + @Override + public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { + watch(monitor); + } +}