Move zkclient from I0ITec to Helix codebase.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7fc03f4c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7fc03f4c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7fc03f4c Branch: refs/heads/master Commit: 7fc03f4c3f66625ed347a130be4205ce9d9419b4 Parents: 9b9da19 Author: Lei Xia <[email protected]> Authored: Wed Dec 13 15:57:28 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:32:32 2018 -0800 ---------------------------------------------------------------------- .../org/apache/helix/manager/zk/ZkClient.java | 3 +- .../helix/manager/zk/zookeeper/ZkClient.java | 1226 ++++++++++++++++++ .../manager/zk/zookeeper/ZkEventThread.java | 85 ++ 3 files changed, 1313 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7fc03f4c/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java index c9f7ccf..182c77e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java @@ -48,8 +48,9 @@ import java.util.concurrent.Callable; * ZkClient jar Ideally we should commit the changes we do here to ZKClient. */ -public class ZkClient extends org.I0Itec.zkclient.ZkClient { +public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient { private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); + public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000; public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000; http://git-wip-us.apache.org/repos/asf/helix/blob/7fc03f4c/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java new file mode 100644 index 0000000..d26a274 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -0,0 +1,1226 @@ +/** + * Copyright 2010 the original author or authors. + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.helix.manager.zk.zookeeper; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import org.I0Itec.zkclient.DataUpdater; +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkConnection; +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.IZkStateListener; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.ZkLock; +import org.I0Itec.zkclient.exception.ZkBadVersionException; +import org.I0Itec.zkclient.exception.ZkException; +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.I0Itec.zkclient.exception.ZkTimeoutException; +import org.I0Itec.zkclient.ExceptionUtil; +import org.I0Itec.zkclient.serialize.SerializableSerializer; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper + */ +public class ZkClient implements Watcher { + private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); + + protected final IZkConnection _connection; + protected final long operationRetryTimeoutInMillis; + private final Map<String, Set<IZkChildListener>> _childListener = + new ConcurrentHashMap<String, Set<IZkChildListener>>(); + private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener = + new ConcurrentHashMap<String, Set<IZkDataListener>>(); + private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>(); + private KeeperState _currentState; + private final ZkLock _zkEventLock = new ZkLock(); + private boolean _shutdownTriggered; + private ZkEventThread _eventThread; + // TODO PVo remove this later + private Thread _zookeeperEventThread; + private ZkSerializer _zkSerializer; + private volatile boolean _closed; + + public ZkClient(String serverstring) { + this(serverstring, Integer.MAX_VALUE); + } + + public ZkClient(String zkServers, int connectionTimeout) { + this(new ZkConnection(zkServers), connectionTimeout); + } + + public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) { + this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout); + } + + public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, + ZkSerializer zkSerializer) { + this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer); + } + + /** + * + * @param zkServers + * The Zookeeper servers + * @param sessionTimeout + * The session timeout in milli seconds + * @param connectionTimeout + * The connection timeout in milli seconds + * @param zkSerializer + * The Zookeeper data serializer + * @param operationRetryTimeout + * Most operations done through this {@link org.I0Itec.zkclient.ZkClient} are retried in cases like + * connection loss with the Zookeeper servers. During such failures, this + * <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each + * operation is retried. A value lesser than 0 is considered as + * "retry forever until a connection has been reestablished". + */ + public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout, + final ZkSerializer zkSerializer, final long operationRetryTimeout) { + this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer, + operationRetryTimeout); + } + + public ZkClient(IZkConnection connection) { + this(connection, Integer.MAX_VALUE); + } + + public ZkClient(IZkConnection connection, int connectionTimeout) { + this(connection, connectionTimeout, new SerializableSerializer()); + } + + public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) { + this(zkConnection, connectionTimeout, zkSerializer, -1); + } + + /** + * + * @param zkConnection + * The Zookeeper servers + * @param connectionTimeout + * The connection timeout in milli seconds + * @param zkSerializer + * The Zookeeper data serializer + * @param operationRetryTimeout + * Most operations done through this {@link org.I0Itec.zkclient.ZkClient} are retried in cases like + * connection loss with the Zookeeper servers. During such failures, this + * <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each + * operation is retried. A value lesser than 0 is considered as + * "retry forever until a connection has been reestablished". + */ + public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, + final ZkSerializer zkSerializer, final long operationRetryTimeout) { + if (zkConnection == null) { + throw new NullPointerException("Zookeeper connection is null!"); + } + _connection = zkConnection; + _zkSerializer = zkSerializer; + this.operationRetryTimeoutInMillis = operationRetryTimeout; + connect(connectionTimeout, this); + } + + public void setZkSerializer(ZkSerializer zkSerializer) { + _zkSerializer = zkSerializer; + } + + public List<String> subscribeChildChanges(String path, IZkChildListener listener) { + synchronized (_childListener) { + Set<IZkChildListener> listeners = _childListener.get(path); + if (listeners == null) { + listeners = new CopyOnWriteArraySet<IZkChildListener>(); + _childListener.put(path, listeners); + } + listeners.add(listener); + } + return watchForChilds(path); + } + + public void unsubscribeChildChanges(String path, IZkChildListener childListener) { + synchronized (_childListener) { + final Set<IZkChildListener> listeners = _childListener.get(path); + if (listeners != null) { + listeners.remove(childListener); + } + } + } + + public void subscribeDataChanges(String path, IZkDataListener listener) { + Set<IZkDataListener> listeners; + synchronized (_dataListener) { + listeners = _dataListener.get(path); + if (listeners == null) { + listeners = new CopyOnWriteArraySet<IZkDataListener>(); + _dataListener.put(path, listeners); + } + listeners.add(listener); + } + watchForData(path); + LOG.debug("Subscribed data changes for " + path); + } + + public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { + synchronized (_dataListener) { + final Set<IZkDataListener> listeners = _dataListener.get(path); + if (listeners != null) { + listeners.remove(dataListener); + } + if (listeners == null || listeners.isEmpty()) { + _dataListener.remove(path); + } + } + } + + public void subscribeStateChanges(final IZkStateListener listener) { + synchronized (_stateListener) { + _stateListener.add(listener); + } + } + + public void unsubscribeStateChanges(IZkStateListener stateListener) { + synchronized (_stateListener) { + _stateListener.remove(stateListener); + } + } + + public void unsubscribeAll() { + synchronized (_childListener) { + _childListener.clear(); + } + synchronized (_dataListener) { + _dataListener.clear(); + } + synchronized (_stateListener) { + _stateListener.clear(); + } + } + + // </listeners> + + /** + * Create a persistent node. + * + * @param path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createPersistent(String path) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + createPersistent(path, false); + } + + /** + * Create a persistent node and set its ACLs. + * + * @param path + * @param createParents + * if true all parent dirs are created as well and no {@link ZkNodeExistsException} is thrown in case the + * path already exists + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createPersistent(String path, boolean createParents) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE); + } + + /** + * Create a persistent node and set its ACLs. + * + * @param path + * @param acl + * List of ACL permissions to assign to the node + * @param createParents + * if true all parent dirs are created as well and no {@link ZkNodeExistsException} is thrown in case the + * path already exists + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createPersistent(String path, boolean createParents, List<ACL> acl) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + try { + create(path, null, acl, CreateMode.PERSISTENT); + } catch (ZkNodeExistsException e) { + if (!createParents) { + throw e; + } + } catch (ZkNoNodeException e) { + if (!createParents) { + throw e; + } + String parentDir = path.substring(0, path.lastIndexOf('/')); + createPersistent(parentDir, createParents, acl); + createPersistent(path, createParents, acl); + } + } + + /** + * Create a persistent node. + * + * @param path + * @param data + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createPersistent(String path, Object data) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + create(path, data, CreateMode.PERSISTENT); + } + + /** + * Create a persistent node. + * + * @param path + * @param data + * @param acl + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createPersistent(String path, Object data, List<ACL> acl) { + create(path, data, acl, CreateMode.PERSISTENT); + } + + /** + * Create a persistent, sequental node. + * + * @param path + * @param data + * @return create node's path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public String createPersistentSequential(String path, Object data) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL); + } + + /** + * Create a persistent, sequential node and set its ACL. + * + * @param path + * @param acl + * @param data + * @return create node's path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public String createPersistentSequential(String path, Object data, List<ACL> acl) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); + } + + /** + * Create an ephemeral node. + * + * @param path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createEphemeral(final String path) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + create(path, null, CreateMode.EPHEMERAL); + } + + /** + * Create an ephemeral node and set its ACL. + * + * @param path + * @param acl + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createEphemeral(final String path, final List<ACL> acl) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + create(path, null, acl, CreateMode.EPHEMERAL); + } + + /** + * Create a node. + * + * @param path + * @param data + * @param mode + * @return create node's path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public String create(final String path, Object data, final CreateMode mode) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); + } + + /** + * Create a node with ACL. + * + * @param path + * @param data + * @param acl + * @param mode + * @return create node's path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) { + if (path == null) { + throw new NullPointerException("Missing value for path"); + } + if (acl == null || acl.size() == 0) { + throw new NullPointerException("Missing value for ACL"); + } + final byte[] bytes = data == null ? null : serialize(data); + + return retryUntilConnected(new Callable<String>() { + @Override public String call() throws Exception { + return _connection.create(path, bytes, acl, mode); + } + }); + + } + + /** + * Create an ephemeral node. + * + * @param path + * @param data + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createEphemeral(final String path, final Object data) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + create(path, data, CreateMode.EPHEMERAL); + } + + /** + * Create an ephemeral node. + * + * @param path + * @param data + * @param acl + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public void createEphemeral(final String path, final Object data, final List<ACL> acl) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + create(path, data, acl, CreateMode.EPHEMERAL); + } + + /** + * Create an ephemeral, sequential node. + * + * @param path + * @param data + * @return created path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public String createEphemeralSequential(final String path, final Object data) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + /** + * Create an ephemeral, sequential node with ACL. + * + * @param path + * @param data + * @param acl + * @return created path + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs + */ + public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); + } + + @Override public void process(WatchedEvent event) { + LOG.debug("Received event: " + event); + _zookeeperEventThread = Thread.currentThread(); + + boolean stateChanged = event.getPath() == null; + boolean znodeChanged = event.getPath() != null; + boolean dataChanged = + event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted + || event.getType() == EventType.NodeCreated + || event.getType() == EventType.NodeChildrenChanged; + + getEventLock().lock(); + try { + + // We might have to install child change event listener if a new node was created + if (getShutdownTrigger()) { + LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + + "}' since shutdown triggered"); + return; + } + if (stateChanged) { + processStateChanged(event); + } + if (dataChanged) { + processDataOrChildChange(event); + } + } finally { + if (stateChanged) { + getEventLock().getStateChangedCondition().signalAll(); + + // If the session expired we have to signal all conditions, because watches might have been removed and + // there is no guarantee that those + // conditions will be signaled at all after an Expired event + // TODO PVo write a test for this + if (event.getState() == KeeperState.Expired) { + getEventLock().getZNodeEventCondition().signalAll(); + getEventLock().getDataChangedCondition().signalAll(); + // We also have to notify all listeners that something might have changed + fireAllEvents(); + } + } + if (znodeChanged) { + getEventLock().getZNodeEventCondition().signalAll(); + } + if (dataChanged) { + getEventLock().getDataChangedCondition().signalAll(); + } + getEventLock().unlock(); + LOG.debug("Leaving process event"); + } + } + + private void fireAllEvents() { + for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) { + fireChildChangedEvents(entry.getKey(), entry.getValue()); + } + for (Entry<String, Set<IZkDataListener>> entry : _dataListener.entrySet()) { + fireDataChangedEvents(entry.getKey(), entry.getValue()); + } + } + + public List<String> getChildren(String path) { + return getChildren(path, hasListeners(path)); + } + + protected List<String> getChildren(final String path, final boolean watch) { + return retryUntilConnected(new Callable<List<String>>() { + @Override public List<String> call() throws Exception { + return _connection.getChildren(path, watch); + } + }); + } + + /** + * Counts number of children for the given path. + * + * @param path + * @return number of children or 0 if path does not exist. + */ + public int countChildren(String path) { + try { + return getChildren(path).size(); + } catch (ZkNoNodeException e) { + return 0; + } + } + + protected boolean exists(final String path, final boolean watch) { + return retryUntilConnected(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return _connection.exists(path, watch); + } + }); + } + + public boolean exists(final String path) { + return exists(path, hasListeners(path)); + } + + private void processStateChanged(WatchedEvent event) { + LOG.info("zookeeper state changed (" + event.getState() + ")"); + setCurrentState(event.getState()); + if (getShutdownTrigger()) { + return; + } + fireStateChangedEvent(event.getState()); + if (event.getState() == KeeperState.Expired) { + try { + reconnect(); + fireNewSessionEvents(); + } catch (final Exception e) { + LOG.info( + "Unable to re-establish connection. Notifying consumer of the following exception: ", + e); + fireSessionEstablishmentError(e); + } + } + } + + private void fireNewSessionEvents() { + for (final IZkStateListener stateListener : _stateListener) { + _eventThread.send(new ZkEvent("New session event sent to " + stateListener) { + + @Override public void run() throws Exception { + stateListener.handleNewSession(); + } + }); + } + } + + private void fireStateChangedEvent(final KeeperState state) { + for (final IZkStateListener stateListener : _stateListener) { + _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) { + + @Override public void run() throws Exception { + stateListener.handleStateChanged(state); + } + }); + } + } + + private void fireSessionEstablishmentError(final Throwable error) { + for (final IZkStateListener stateListener : _stateListener) { + _eventThread + .send(new ZkEvent("Session establishment error(" + error + ") sent to " + stateListener) { + + @Override public void run() throws Exception { + stateListener.handleSessionEstablishmentError(error); + } + }); + } + } + + private boolean hasListeners(String path) { + Set<IZkDataListener> dataListeners = _dataListener.get(path); + if (dataListeners != null && dataListeners.size() > 0) { + return true; + } + Set<IZkChildListener> childListeners = _childListener.get(path); + if (childListeners != null && childListeners.size() > 0) { + return true; + } + return false; + } + + public boolean deleteRecursive(String path) { + List<String> children; + try { + children = getChildren(path, false); + } catch (ZkNoNodeException e) { + return true; + } + + for (String subPath : children) { + if (!deleteRecursive(path + "/" + subPath)) { + return false; + } + } + + return delete(path); + } + + private void processDataOrChildChange(WatchedEvent event) { + final String path = event.getPath(); + + if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated + || event.getType() == EventType.NodeDeleted) { + Set<IZkChildListener> childListeners = _childListener.get(path); + if (childListeners != null && !childListeners.isEmpty()) { + fireChildChangedEvents(path, childListeners); + } + } + + if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted + || event.getType() == EventType.NodeCreated) { + Set<IZkDataListener> listeners = _dataListener.get(path); + if (listeners != null && !listeners.isEmpty()) { + fireDataChangedEvents(event.getPath(), listeners); + } + } + } + + private void fireDataChangedEvents(final String path, Set<IZkDataListener> listeners) { + for (final IZkDataListener listener : listeners) { + _eventThread.send(new ZkEvent("Data of " + path + " changed sent to " + listener) { + + @Override public void run() throws Exception { + // reinstall watch + exists(path, true); + try { + Object data = readData(path, null, true); + listener.handleDataChange(path, data); + } catch (ZkNoNodeException e) { + listener.handleDataDeleted(path); + } + } + }); + } + } + + private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners) { + try { + // reinstall the watch + for (final IZkChildListener listener : childListeners) { + _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) { + + @Override public void run() throws Exception { + try { + // if the node doesn't exist we should listen for the root node to reappear + exists(path); + List<String> children = getChildren(path); + listener.handleChildChange(path, children); + } catch (ZkNoNodeException e) { + listener.handleChildChange(path, null); + } + } + }); + } + } catch (Exception e) { + LOG.error("Failed to fire child changed event. Unable to getChildren. ", e); + } + } + + public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) + throws ZkInterruptedException { + Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time)); + LOG.debug("Waiting until znode '" + path + "' becomes available."); + if (exists(path)) { + return true; + } + acquireEventLock(); + try { + while (!exists(path, true)) { + boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout); + if (!gotSignal) { + return false; + } + } + return true; + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } finally { + getEventLock().unlock(); + } + } + + protected Set<IZkDataListener> getDataListener(String path) { + return _dataListener.get(path); + } + + public void waitUntilConnected() throws ZkInterruptedException { + waitUntilConnected(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException { + return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit); + } + + public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) + throws ZkInterruptedException { + if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { + throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); + } + Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time)); + + LOG.debug("Waiting for keeper state " + keeperState); + acquireEventLock(); + try { + boolean stillWaiting = true; + while (_currentState != keeperState) { + if (!stillWaiting) { + return false; + } + stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout); + } + LOG.debug("State is " + _currentState); + return true; + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } finally { + getEventLock().unlock(); + } + } + + private void acquireEventLock() { + try { + getEventLock().lockInterruptibly(); + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } + } + + /** + * + * @param <T> + * @param callable + * @return result of Callable + * @throws ZkInterruptedException + * if operation was interrupted, or a required reconnection got interrupted + * @throws IllegalArgumentException + * if called from anything except the ZooKeeper event thread + * @throws ZkException + * if any ZooKeeper exception occurred + * @throws RuntimeException + * if any other exception occurs from invoking the Callable + */ + public <T> T retryUntilConnected(Callable<T> callable) + throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { + if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { + throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); + } + final long operationStartTime = System.currentTimeMillis(); + while (true) { + if (_closed) { + throw new IllegalStateException("ZkClient already closed!"); + } + try { + return callable.call(); + } catch (ConnectionLossException e) { + // we give the event thread some time to update the status to 'Disconnected' + Thread.yield(); + waitForRetry(); + } catch (SessionExpiredException e) { + // we give the event thread some time to update the status to 'Expired' + Thread.yield(); + waitForRetry(); + } catch (KeeperException e) { + throw ZkException.create(e); + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } catch (Exception e) { + throw ExceptionUtil.convertToRuntimeException(e); + } + // before attempting a retry, check whether retry timeout has elapsed + if (this.operationRetryTimeoutInMillis > -1 + && (System.currentTimeMillis() - operationStartTime) + >= this.operationRetryTimeoutInMillis) { + throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" + + this.operationRetryTimeoutInMillis + " milli seconds)"); + } + } + } + + private void waitForRetry() { + if (this.operationRetryTimeoutInMillis < 0) { + this.waitUntilConnected(); + return; + } + this.waitUntilConnected(this.operationRetryTimeoutInMillis, TimeUnit.MILLISECONDS); + } + + public void setCurrentState(KeeperState currentState) { + getEventLock().lock(); + try { + _currentState = currentState; + } finally { + getEventLock().unlock(); + } + } + + /** + * Returns a mutex all zookeeper events are synchronized aginst. So in case you need to do something without getting + * any zookeeper event interruption synchronize against this mutex. Also all threads waiting on this mutex object + * will be notified on an event. + * + * @return the mutex. + */ + public ZkLock getEventLock() { + return _zkEventLock; + } + + public boolean delete(final String path) { + try { + retryUntilConnected(new Callable<Object>() { + + @Override public Object call() throws Exception { + _connection.delete(path); + return null; + } + }); + + return true; + } catch (ZkNoNodeException e) { + return false; + } + } + + private byte[] serialize(Object data) { + return _zkSerializer.serialize(data); + } + + @SuppressWarnings("unchecked") private <T extends Object> T derializable(byte[] data) { + if (data == null) { + return null; + } + return (T) _zkSerializer.deserialize(data); + } + + @SuppressWarnings("unchecked") public <T extends Object> T readData(String path) { + return (T) readData(path, false); + } + + @SuppressWarnings("unchecked") public <T extends Object> T readData(String path, + boolean returnNullIfPathNotExists) { + T data = null; + try { + data = (T) readData(path, null); + } catch (ZkNoNodeException e) { + if (!returnNullIfPathNotExists) { + throw e; + } + } + return data; + } + + @SuppressWarnings("unchecked") public <T extends Object> T readData(String path, Stat stat) { + return (T) readData(path, stat, hasListeners(path)); + } + + @SuppressWarnings("unchecked") protected <T extends Object> T readData(final String path, + final Stat stat, final boolean watch) { + byte[] data = retryUntilConnected(new Callable<byte[]>() { + + @Override public byte[] call() throws Exception { + return _connection.readData(path, stat, watch); + } + }); + return (T) derializable(data); + } + + public void writeData(String path, Object object) { + writeData(path, object, -1); + } + + /** + * Updates data of an existing znode. The current content of the znode is passed to the {@link DataUpdater} that is + * passed into this method, which returns the new content. The new content is only written back to ZooKeeper if + * nobody has modified the given znode in between. If a concurrent change has been detected the new data of the + * znode is passed to the updater once again until the new contents can be successfully written back to ZooKeeper. + * + * @param <T> + * @param path + * The path of the znode. + * @param updater + * Updater that creates the new contents. + */ + @SuppressWarnings("unchecked") public <T extends Object> void updateDataSerialized(String path, + DataUpdater<T> updater) { + Stat stat = new Stat(); + boolean retry; + do { + retry = false; + try { + T oldData = (T) readData(path, stat); + T newData = updater.update(oldData); + writeData(path, newData, stat.getVersion()); + } catch (ZkBadVersionException e) { + retry = true; + } + } while (retry); + } + + public void writeData(final String path, Object datat, final int expectedVersion) { + writeDataReturnStat(path, datat, expectedVersion); + } + + public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) { + final byte[] data = serialize(datat); + return (Stat) retryUntilConnected(new Callable<Object>() { + + @Override public Object call() throws Exception { + Stat stat = _connection.writeDataReturnStat(path, data, expectedVersion); + return stat; + } + }); + } + + public void watchForData(final String path) { + retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + _connection.exists(path, true); + return null; + } + }); + } + + /** + * Installs a child watch for the given path. + * + * @param path + * @return the current children of the path or null if the zk node with the given path doesn't exist. + */ + public List<String> watchForChilds(final String path) { + if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { + throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); + } + return retryUntilConnected(new Callable<List<String>>() { + @Override public List<String> call() throws Exception { + exists(path, true); + try { + return getChildren(path, true); + } catch (ZkNoNodeException e) { + // ignore, the "exists" watch will listen for the parent node to appear + } + return null; + } + }); + } + + /** + * Add authentication information to the connection. This will be used to identify the user and check access to + * nodes protected by ACLs + * + * @param scheme + * @param auth + */ + public void addAuthInfo(final String scheme, final byte[] auth) { + retryUntilConnected(new Callable<Object>() { + @Override public Object call() throws Exception { + _connection.addAuthInfo(scheme, auth); + return null; + } + }); + } + + /** + * Connect to ZooKeeper. + * + * @param maxMsToWaitUntilConnected + * @param watcher + * @throws ZkInterruptedException + * if the connection timed out due to thread interruption + * @throws ZkTimeoutException + * if the connection timed out + * @throws IllegalStateException + * if the connection timed out due to thread interruption + */ + public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) + throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { + boolean started = false; + acquireEventLock(); + try { + setShutdownTrigger(false); + _eventThread = new ZkEventThread(_connection.getServers()); + _eventThread.start(); + _connection.connect(watcher); + + LOG.debug("Awaiting connection to Zookeeper server"); + if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) { + throw new ZkTimeoutException( + "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected); + } + started = true; + } finally { + getEventLock().unlock(); + + // we should close the zookeeper instance, otherwise it would keep + // on trying to connect + if (!started) { + close(); + } + } + } + + public long getCreationTime(String path) { + acquireEventLock(); + try { + return _connection.getCreateTime(path); + } catch (KeeperException e) { + throw ZkException.create(e); + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } finally { + getEventLock().unlock(); + } + } + + /** + * Close the client. + * + * @throws ZkInterruptedException + */ + public void close() throws ZkInterruptedException { + if (_closed) { + return; + } + LOG.debug("Closing ZkClient..."); + getEventLock().lock(); + try { + setShutdownTrigger(true); + _eventThread.interrupt(); + _eventThread.join(2000); + _connection.close(); + _closed = true; + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } finally { + getEventLock().unlock(); + } + LOG.debug("Closing ZkClient...done"); + } + + private void reconnect() { + getEventLock().lock(); + try { + _connection.close(); + _connection.connect(this); + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } finally { + getEventLock().unlock(); + } + } + + public void setShutdownTrigger(boolean triggerState) { + _shutdownTriggered = triggerState; + } + + public boolean getShutdownTrigger() { + return _shutdownTriggered; + } + + public int numberOfListeners() { + int listeners = 0; + for (Set<IZkChildListener> childListeners : _childListener.values()) { + listeners += childListeners.size(); + } + for (Set<IZkDataListener> dataListeners : _dataListener.values()) { + listeners += dataListeners.size(); + } + listeners += _stateListener.size(); + + return listeners; + } + + public List<OpResult> multi(final Iterable<Op> ops) throws ZkException { + if (ops == null) { + throw new NullPointerException("ops must not be null."); + } + + return retryUntilConnected(new Callable<List<OpResult>>() { + + @Override public List<OpResult> call() throws Exception { + return _connection.multi(ops); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7fc03f4c/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java new file mode 100644 index 0000000..dcf7019 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java @@ -0,0 +1,85 @@ +/** + * Copyright 2010 the original author or authors. + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.helix.manager.zk.zookeeper; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * All listeners registered at the {@link ZkClient} will be notified from this event thread. This is to prevent + * dead-lock situations. The {@link ZkClient} pulls some information out of the {@link ZooKeeper} events to signal + * {@link ZkLock} conditions. Re-using the {@link ZooKeeper} event thread to also notify {@link ZkClient} listeners, + * would stop the ZkClient from receiving events from {@link ZooKeeper} as soon as one of the listeners blocks (because + * it is waiting for something). {@link ZkClient} would then for instance not be able to maintain it's connection state + * anymore. + */ +class ZkEventThread extends Thread { + private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); + + private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<ZkEvent>(); + + private static AtomicInteger _eventId = new AtomicInteger(0); + + public static abstract class ZkEvent { + + private String _description; + + public ZkEvent(String description) { + _description = description; + } + + public abstract void run() throws Exception; + + @Override public String toString() { + return "ZkEvent[" + _description + "]"; + } + } + + ZkEventThread(String name) { + setDaemon(true); + setName("ZkClient-EventThread-" + getId() + "-" + name); + } + + @Override public void run() { + LOG.info("Starting ZkClient event thread."); + try { + while (!isInterrupted()) { + ZkEvent zkEvent = _events.take(); + int eventId = _eventId.incrementAndGet(); + LOG.debug("Delivering event #" + eventId + " " + zkEvent); + try { + zkEvent.run(); + } catch (InterruptedException e) { + interrupt(); + } catch (ZkInterruptedException e) { + interrupt(); + } catch (Throwable e) { + LOG.error("Error handling event " + zkEvent, e); + } + LOG.debug("Delivering event #" + eventId + " done"); + } + } catch (InterruptedException e) { + LOG.info("Terminate ZkClient event thread."); + } + } + + public void send(ZkEvent event) { + if (!isInterrupted()) { + LOG.debug("New event: " + event); + _events.add(event); + } + } +}
