Repository: helix Updated Branches: refs/heads/master 281f5d1ec -> 7bb55742e
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index b57ca87..9c18d09 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -57,6 +57,7 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -70,8 +71,8 @@ public class ZkClient implements Watcher { private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds - protected final IZkConnection _connection; - protected final long _operationRetryTimeoutInMillis; + private final IZkConnection _connection; + private final long _operationRetryTimeoutInMillis; private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener = @@ -87,7 +88,6 @@ public class ZkClient implements Watcher { private PathBasedZkSerializer _pathBasedZkSerializer; private ZkClientMonitor _monitor; - private class IZkDataListenerEntry { final IZkDataListener _dataListener; final boolean _prefetchData; @@ -130,7 +130,6 @@ public class ZkClient implements Watcher { } } - protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly) { @@ -140,6 +139,7 @@ public class ZkClient implements Watcher { _connection = zkConnection; _pathBasedZkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; + connect(connectionTimeout, this); // initiate monitor @@ -510,7 +510,7 @@ public class ZkClient implements Watcher { String actualPath = retryUntilConnected(new Callable<String>() { @Override public String call() throws Exception { - return _connection.create(path, data, acl, mode); + return getConnection().create(path, data, acl, mode); } }); record(path, data, startT, ZkClientMonitor.AccessType.WRITE); @@ -695,7 +695,7 @@ public class ZkClient implements Watcher { List<String> children = retryUntilConnected(new Callable<List<String>>() { @Override public List<String> call() throws Exception { - return _connection.getChildren(path, watch); + return getConnection().getChildren(path, watch); } }); record(path, null, startT, ZkClientMonitor.AccessType.READ); @@ -737,7 +737,7 @@ public class ZkClient implements Watcher { boolean exists = retryUntilConnected(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - return _connection.exists(path, watch); + return getConnection().exists(path, watch); } }); record(path, null, startT, ZkClientMonitor.AccessType.READ); @@ -759,7 +759,7 @@ public class ZkClient implements Watcher { Stat stat = retryUntilConnected(new Callable<Stat>() { @Override public Stat call() throws Exception { - Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false); + Stat stat = ((ZkConnection) getConnection()).getZookeeper().exists(path, false); return stat; } }); @@ -776,14 +776,14 @@ public class ZkClient implements Watcher { } } - private void processStateChanged(WatchedEvent event) { + protected void processStateChanged(WatchedEvent event) { LOG.info("zookeeper state changed (" + event.getState() + ")"); setCurrentState(event.getState()); if (getShutdownTrigger()) { return; } fireStateChangedEvent(event.getState()); - if (event.getState() == KeeperState.Expired) { + if (isManagingZkConnection() && event.getState() == KeeperState.Expired) { reconnectOnExpiring(); } } @@ -794,7 +794,7 @@ public class ZkClient implements Watcher { new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true); Exception reconnectException = new ZkException("Shutdown triggered."); - while (!_closed) { + while (!isClosed()) { try { reconnect(); fireNewSessionEvents(); @@ -820,6 +820,19 @@ public class ZkClient implements Watcher { fireSessionEstablishmentError(reconnectException); } + private void reconnect() { + getEventLock().lock(); + try { + IZkConnection connection = getConnection(); + connection.close(); + connection.connect(this); + } catch (InterruptedException e) { + throw new ZkInterruptedException(e); + } finally { + getEventLock().unlock(); + } + } + private void fireNewSessionEvents() { for (final IZkStateListener stateListener : _stateListener) { _eventThread.send(new ZkEvent("New session event sent to " + stateListener) { @@ -831,7 +844,7 @@ public class ZkClient implements Watcher { } } - private void fireStateChangedEvent(final KeeperState state) { + protected void fireStateChangedEvent(final KeeperState state) { for (final IZkStateListener stateListener : _stateListener) { _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) { @@ -1073,7 +1086,7 @@ public class ZkClient implements Watcher { } final long operationStartTime = System.currentTimeMillis(); while (true) { - if (_closed) { + if (isClosed()) { throw new IllegalStateException("ZkClient already closed!"); } try { @@ -1147,7 +1160,7 @@ public class ZkClient implements Watcher { @Override public Object call() throws Exception { - _connection.delete(path); + getConnection().delete(path); return null; } }); @@ -1226,7 +1239,7 @@ public class ZkClient implements Watcher { data = retryUntilConnected(new Callable<byte[]>() { @Override public byte[] call() throws Exception { - return _connection.readData(path, stat, watch); + return getConnection().readData(path, stat, watch); } }); record(path, data, startT, ZkClientMonitor.AccessType.READ); @@ -1299,7 +1312,7 @@ public class ZkClient implements Watcher { checkDataSizeLimit(data); final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - return _connection.writeDataReturnStat(path, data, expectedVersion); + return getConnection().writeDataReturnStat(path, data, expectedVersion); } }); record(path, data, startT, ZkClientMonitor.AccessType.WRITE); @@ -1326,7 +1339,7 @@ public class ZkClient implements Watcher { final byte[] data = (datat == null ? null : serialize(datat, path)); retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, + ((ZkConnection) getConnection()).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, // Arrays.asList(DEFAULT_ACL), mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, data == null ? 0 : data.length, false)); @@ -1342,7 +1355,7 @@ public class ZkClient implements Watcher { final byte[] data = serialize(datat, path); retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb, + ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, data == null ? 0 : data.length, false)); return null; @@ -1354,7 +1367,7 @@ public class ZkClient implements Watcher { final long startT = System.currentTimeMillis(); retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().getData(path, null, cb, + ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); return null; } @@ -1365,7 +1378,7 @@ public class ZkClient implements Watcher { final long startT = System.currentTimeMillis(); retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().exists(path, null, cb, + ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true)); return null; } @@ -1376,7 +1389,7 @@ public class ZkClient implements Watcher { final long startT = System.currentTimeMillis(); retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb, + ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false)); return null; } @@ -1394,7 +1407,7 @@ public class ZkClient implements Watcher { public void watchForData(final String path) { retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - _connection.exists(path, true); + getConnection().exists(path, true); return null; } }); @@ -1433,7 +1446,7 @@ public class ZkClient implements Watcher { public void addAuthInfo(final String scheme, final byte[] auth) { retryUntilConnected(new Callable<Object>() { @Override public Object call() throws Exception { - _connection.addAuthInfo(scheme, auth); + getConnection().addAuthInfo(scheme, auth); return null; } }); @@ -1453,22 +1466,34 @@ public class ZkClient implements Watcher { */ public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { - if (_closed) { + if (isClosed()) { throw new IllegalStateException("ZkClient already closed!"); } boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); - _eventThread = new ZkEventThread(_connection.getServers()); + + IZkConnection zkConnection = getConnection(); + _eventThread = new ZkEventThread(zkConnection.getServers()); _eventThread.start(); - _connection.connect(watcher); - LOG.debug("Awaiting connection to Zookeeper server"); - if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) { - throw new ZkTimeoutException( - "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected); + if (isManagingZkConnection()) { + zkConnection.connect(watcher); + LOG.debug("Awaiting connection to Zookeeper server"); + if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) { + throw new ZkTimeoutException( + "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected); + } + } else { + // if the client is not managing connection, the input connection is supposed to connect. + if (isConnectionClosed()) { + throw new HelixException( + "Unable to connect to zookeeper server with the specified ZkConnection"); + } + setCurrentState(KeeperState.SyncConnected); } + started = true; } finally { getEventLock().unlock(); @@ -1484,7 +1509,7 @@ public class ZkClient implements Watcher { public long getCreationTime(String path) { acquireEventLock(); try { - return _connection.getCreateTime(path); + return getConnection().getCreateTime(path); } catch (KeeperException e) { throw ZkException.create(e); } catch (InterruptedException e) { @@ -1495,7 +1520,7 @@ public class ZkClient implements Watcher { } public String getServers() { - return _connection.getServers(); + return getConnection().getServers(); } /** @@ -1509,15 +1534,18 @@ public class ZkClient implements Watcher { LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls)); } getEventLock().lock(); + IZkConnection connection = getConnection(); try { - if (_connection == null || _closed) { + if (connection == null || _closed) { return; } - LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper()); setShutdownTrigger(true); _eventThread.interrupt(); _eventThread.join(2000); - _connection.close(); + if (isManagingZkConnection()) { + LOG.info("Closing zkclient: " + ((ZkConnection) connection).getZookeeper()); + connection.close(); + } _closed = true; // send state change notification to unlock any wait @@ -1529,7 +1557,7 @@ public class ZkClient implements Watcher { * Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will * throw ZkInterruptedException and skip ZkConnection#close() */ - if (_connection != null) { + if (connection != null) { try { /** * ZkInterruptedException#construct() honors InterruptedException by calling @@ -1537,7 +1565,9 @@ public class ZkClient implements Watcher { * zk-connection */ Thread.interrupted(); - _connection.close(); + if (isManagingZkConnection()) { + connection.close(); + } /** * restore interrupted status of current thread */ @@ -1556,26 +1586,20 @@ public class ZkClient implements Watcher { } public boolean isClosed() { - return _closed; - } - - public boolean isConnectionClosed() { - return (_connection == null || _connection.getZookeeperState() == null || - !_connection.getZookeeperState().isAlive()); - } - - private void reconnect() { - getEventLock().lock(); try { - _connection.close(); - _connection.connect(this); - } catch (InterruptedException e) { - throw new ZkInterruptedException(e); + getEventLock().lock(); + return _closed; } finally { getEventLock().unlock(); } } + public boolean isConnectionClosed() { + IZkConnection connection = getConnection(); + return (connection == null || connection.getZookeeperState() == null || + !connection.getZookeeperState().isAlive()); + } + public void setShutdownTrigger(boolean triggerState) { _shutdownTriggered = triggerState; } @@ -1605,11 +1629,29 @@ public class ZkClient implements Watcher { return retryUntilConnected(new Callable<List<OpResult>>() { @Override public List<OpResult> call() throws Exception { - return _connection.multi(ops); + return getConnection().multi(ops); } }); } + /** + * @return true if this ZkClient is managing the ZkConnection. + */ + protected boolean isManagingZkConnection() { + return true; + } + + public long getSessionId() { + ZkConnection zkConnection = ((ZkConnection) getConnection()); + ZooKeeper zk = zkConnection.getZookeeper(); + if (zk == null) { + throw new HelixException( + "ZooKeeper connection information is not available now. ZkClient might be disconnected."); + } else { + return zkConnection.getZookeeper().getSessionId(); + } + } + // operations to update monitor's counters private void record(String path, byte[] data, long startTimeMilliSec, ZkClientMonitor.AccessType accessType) { if (_monitor != null) { http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java index d458c52..794e9e1 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java +++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java @@ -31,7 +31,8 @@ import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.slf4j.Logger; @@ -129,14 +130,15 @@ public class HelixCustomCodeRunner { StateMachineEngine stateMach = _manager.getStateMachineEngine(); stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName); - ZkClient zkClient = null; + HelixZkClient zkClient = null; try { // manually add ideal state for participant leader using LeaderStandby // model + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(new ZNRecordSerializer()); + zkClient = SharedZkClientFactory + .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig); - zkClient = - new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); HelixDataAccessor accessor = new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>( zkClient)); @@ -161,9 +163,7 @@ public class HelixCustomCodeRunner { LOG.info("Set idealState for participantLeader:" + _resourceName + ", idealState:" + idealState); } finally { - if (zkClient != null && zkClient.getConnection() != null) - - { + if (zkClient != null && !zkClient.isClosed()) { zkClient.close(); } } http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java index 80a7820..b1d6582 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java +++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java @@ -24,15 +24,17 @@ import java.util.UUID; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.model.Message; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.helix.model.LiveInstance.LiveInstanceProperty; +import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; import org.apache.helix.model.Message.MessageType; public class MessagePoster { public void post(String zkServer, Message message, String clusterName, String instanceName) { - ZkClient client = new ZkClient(zkServer); + HelixZkClient client = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig( + zkServer)); client.setZkSerializer(new ZNRecordSerializer()); String path = PropertyPathBuilder.instanceMessage(clusterName, instanceName, message.getId()); client.delete(path); http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java index dd6f3a9..908bba5 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java @@ -35,13 +35,15 @@ import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.helix.store.PropertyJsonComparator; import org.apache.helix.store.PropertyJsonSerializer; import org.apache.helix.store.PropertyStoreException; import org.apache.helix.tools.TestCommand.CommandType; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.zookeeper.data.Stat; /** * a test is structured logically as a list of commands a command has three parts: COMMAND @@ -747,10 +749,11 @@ public class TestExecutor { String zkAddr, CountDownLatch countDown) { final Map<TestCommand, Boolean> testResults = new ConcurrentHashMap<TestCommand, Boolean>(); - ZkClient zkClient = null; - zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT); - zkClient.setZkSerializer(new ZNRecordSerializer()); + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(new ZNRecordSerializer()); + HelixZkClient zkClient = SharedZkClientFactory + .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig); // sort on trigger's start time, stable sort Collections.sort(commandList, new Comparator<TestCommand>() { @@ -765,7 +768,7 @@ public class TestExecutor { TestTrigger trigger = command._trigger; command._startTimestamp = System.currentTimeMillis() + trigger._startTime; - new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient, + new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, (ZkClient) zkClient, testResults)).start(); } http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java index c171b73..cf7f22e 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java +++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java @@ -38,14 +38,15 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.helix.manager.zk.ByteArraySerializer; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; /** * Dumps the Zookeeper file structure on to Disk */ @SuppressWarnings("static-access") public class ZKDumper { - private ZkClient client; + private HelixZkClient client; private FilenameFilter filter; static Options options; private String suffix = ""; @@ -110,7 +111,8 @@ public class ZKDumper { } public ZKDumper(String zkAddress) { - client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT); + client = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress)); ZkSerializer zkSerializer = new ByteArraySerializer(); client.setZkSerializer(zkSerializer); http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java index 5bd955a..805847c 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java +++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java @@ -37,11 +37,12 @@ import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.manager.zk.ByteArraySerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tool for copying a zk/file path to another zk/file path @@ -99,7 +100,7 @@ public class ZkCopy { * @param dstRootPath * @param paths */ - private static void copy(ZkClient srcClient, String srcRootPath, ZkClient dstClient, + private static void copy(HelixZkClient srcClient, String srcRootPath, HelixZkClient dstClient, String dstRootPath, List<String> paths) { BaseDataAccessor<Object> srcAccessor = new ZkBaseDataAccessor<Object>(srcClient); List<String> readPaths = new ArrayList<String>(); @@ -146,7 +147,8 @@ public class ZkCopy { } } - private static void zkCopy(ZkClient srcClient, String srcRootPath, ZkClient dstClient, String dstRootPath) { + private static void zkCopy(HelixZkClient srcClient, String srcRootPath, HelixZkClient dstClient, + String dstRootPath) { // Strip off tailing "/" if (!srcRootPath.equals("/") && srcRootPath.endsWith("/")) { srcRootPath = srcRootPath.substring(0, srcRootPath.length() - 1); @@ -218,21 +220,21 @@ public class ZkCopy { String srcZkAddr = srcUri.getAuthority(); String dstZkAddr = dstUri.getAuthority(); - ZkClient srcClient = null; - ZkClient dstClient = null; + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + HelixZkClient srcClient = null; + HelixZkClient dstClient = null; try { if (srcZkAddr.equals(dstZkAddr)) { - srcClient = - dstClient = - new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer()); + clientConfig.setZkSerializer(new ByteArraySerializer()); + srcClient = dstClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), clientConfig); } else { - srcClient = - new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer()); - dstClient = - new ZkClient(dstZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, - ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer()); + clientConfig.setZkSerializer(new ByteArraySerializer()); + srcClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), clientConfig); + clientConfig.setZkSerializer(new ByteArraySerializer()); + dstClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(dstZkAddr), clientConfig); } String srcPath = srcUri.getPath(); String dstPath = dstUri.getPath(); http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java index d3f447a..63d87eb 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java @@ -427,7 +427,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase { @Override public ZkClient getZkClient() { - return _zkclient; + return (ZkClient) _zkclient; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java index ffa2cb2..96f4a88 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java @@ -89,7 +89,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable @Override public ZkClient getZkClient() { - return _zkclient; + return (ZkClient) _zkclient; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java index 1cce08d..b186a1a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java @@ -83,7 +83,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn @Override public ZkClient getZkClient() { - return _zkclient; + return (ZkClient) _zkclient; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index 2bd2630..362709a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -128,7 +128,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, @Override public ZkClient getZkClient() { - return _zkclient; + return (ZkClient) _zkclient; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java new file mode 100644 index 0000000..d0cf004 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java @@ -0,0 +1,294 @@ +package org.apache.helix.manager.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.IZkStateListener; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.monitoring.mbeans.MBeanRegistrar; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; +import org.apache.helix.monitoring.mbeans.ZkClientMonitor; +import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.AssertJUnit; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestRawZkClient extends ZkUnitTestBase { + private static Logger LOG = LoggerFactory.getLogger(TestRawZkClient.class); + + ZkClient _zkClient; + + @BeforeClass + public void beforeClass() { + _zkClient = new ZkClient(ZK_ADDR); + _zkClient.setZkSerializer(new ZNRecordSerializer()); + } + + @AfterClass + public void afterClass() { + _zkClient.close(); + } + + @Test() + void testGetStat() { + String path = "/tmp/getStatTest"; + _zkClient.deleteRecursively(path); + + Stat stat, newStat; + stat = _zkClient.getStat(path); + AssertJUnit.assertNull(stat); + _zkClient.createPersistent(path, true); + + stat = _zkClient.getStat(path); + AssertJUnit.assertNotNull(stat); + + newStat = _zkClient.getStat(path); + AssertJUnit.assertEquals(stat, newStat); + + _zkClient.writeData(path, new ZNRecord("Test")); + newStat = _zkClient.getStat(path); + AssertJUnit.assertNotSame(stat, newStat); + } + + @Test() + void testSessionExpire() throws Exception { + IZkStateListener listener = new IZkStateListener() { + + @Override + public void handleStateChanged(KeeperState state) throws Exception { + System.out.println("In Old connection New state " + state); + } + + @Override + public void handleNewSession() throws Exception { + System.out.println("In Old connection New session"); + } + + @Override + public void handleSessionEstablishmentError(Throwable var1) throws Exception { + } + }; + + _zkClient.subscribeStateChanges(listener); + ZkConnection connection = ((ZkConnection) _zkClient.getConnection()); + ZooKeeper zookeeper = connection.getZookeeper(); + System.out.println("old sessionId= " + zookeeper.getSessionId()); + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + System.out.println("In New connection In process event:" + event); + } + }; + ZooKeeper newZookeeper = + new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher, + zookeeper.getSessionId(), zookeeper.getSessionPasswd()); + Thread.sleep(3000); + System.out.println("New sessionId= " + newZookeeper.getSessionId()); + Thread.sleep(3000); + newZookeeper.close(); + Thread.sleep(10000); + connection = ((ZkConnection) _zkClient.getConnection()); + zookeeper = connection.getZookeeper(); + System.out.println("After session expiry sessionId= " + zookeeper.getSessionId()); + } + + @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*") + void testDataSizeLimit() { + ZNRecord data = new ZNRecord(new String(new char[1024 * 1024])); + _zkClient.writeData("/test", data, -1); + } + + @Test + public void testZkClientMonitor() throws Exception { + final String TEST_TAG = "test_monitor"; + final String TEST_KEY = "test_key"; + final String TEST_DATA = "testData"; + final String TEST_ROOT = "/my_cluster/IDEALSTATES"; + final String TEST_NODE = "/test_zkclient_monitor"; + final String TEST_PATH = TEST_ROOT + TEST_NODE; + + ZkClient.Builder builder = new ZkClient.Builder(); + builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG) + .setMonitorRootPathOnly(false); + ZkClient zkClient = builder.build(); + + final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length; + + if (_zkClient.exists(TEST_PATH)) { + _zkClient.delete(TEST_PATH); + } + if (!_zkClient.exists(TEST_ROOT)) { + _zkClient.createPersistent(TEST_ROOT, true); + } + + MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); + + ObjectName name = MBeanRegistrar + .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, + TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY); + ObjectName rootname = MBeanRegistrar + .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, + TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH, + "Root"); + ObjectName idealStatename = MBeanRegistrar + .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, + TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH, + "IdealStates"); + Assert.assertTrue(beanServer.isRegistered(rootname)); + Assert.assertTrue(beanServer.isRegistered(idealStatename)); + + // Test exists + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0); + zkClient.exists(TEST_ROOT); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0); + + // Test create + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"), + 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0); + zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), + TEST_DATA_SIZE); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), + TEST_DATA_SIZE); + long origWriteTotalLatencyCounter = + (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"); + Assert.assertTrue(origWriteTotalLatencyCounter >= 0); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0); + long origIdealStatesWriteTotalLatencyCounter = + (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"); + Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0); + Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0); + + // Test read + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0); + long origReadTotalLatencyCounter = + (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"); + long origIdealStatesReadTotalLatencyCounter = + (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter"); + Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0); + zkClient.readData(TEST_PATH, new Stat()); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2); + Assert + .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), + TEST_DATA_SIZE); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") + >= origReadTotalLatencyCounter); + Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter") + >= origIdealStatesReadTotalLatencyCounter); + Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0); + zkClient.getChildren(TEST_PATH); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3); + Assert + .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), + TEST_DATA_SIZE); + zkClient.getStat(TEST_PATH); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4); + Assert + .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), + TEST_DATA_SIZE); + zkClient.readDataAndStat(TEST_PATH, new Stat(), true); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5); + + ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler = + new ZkAsyncCallbacks.ExistsCallbackHandler(); + zkClient.asyncExists(TEST_PATH, callbackHandler); + callbackHandler.waitForSuccess(); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6); + + // Test write + zkClient.writeData(TEST_PATH, TEST_DATA); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2); + Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), + TEST_DATA_SIZE * 2); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2); + Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), + TEST_DATA_SIZE * 2); + Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter") + >= origWriteTotalLatencyCounter); + Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter") + >= origIdealStatesWriteTotalLatencyCounter); + + // Test data change count + final Lock lock = new ReentrantLock(); + final Condition callbackFinish = lock.newCondition(); + zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + lock.lock(); + try { + callbackFinish.signal(); + } finally { + lock.unlock(); + } + } + }); + lock.lock(); + _zkClient.delete(TEST_PATH); + Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS)); + Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java deleted file mode 100644 index a18dd29..0000000 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java +++ /dev/null @@ -1,294 +0,0 @@ -package org.apache.helix.manager.zk; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.lang.management.ManagementFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.helix.HelixException; -import org.apache.helix.ZNRecord; -import org.apache.helix.ZkUnitTestBase; -import org.apache.helix.monitoring.mbeans.MBeanRegistrar; -import org.apache.helix.monitoring.mbeans.MonitorDomainNames; -import org.apache.helix.monitoring.mbeans.ZkClientMonitor; -import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.AssertJUnit; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class TestZkClient extends ZkUnitTestBase { - private static Logger LOG = LoggerFactory.getLogger(TestZkClient.class); - - ZkClient _zkClient; - - @BeforeClass - public void beforeClass() { - _zkClient = new ZkClient(ZK_ADDR); - _zkClient.setZkSerializer(new ZNRecordSerializer()); - } - - @AfterClass - public void afterClass() { - _zkClient.close(); - } - - @Test() - void testGetStat() { - String path = "/tmp/getStatTest"; - _zkClient.deleteRecursively(path); - - Stat stat, newStat; - stat = _zkClient.getStat(path); - AssertJUnit.assertNull(stat); - _zkClient.createPersistent(path, true); - - stat = _zkClient.getStat(path); - AssertJUnit.assertNotNull(stat); - - newStat = _zkClient.getStat(path); - AssertJUnit.assertEquals(stat, newStat); - - _zkClient.writeData(path, new ZNRecord("Test")); - newStat = _zkClient.getStat(path); - AssertJUnit.assertNotSame(stat, newStat); - } - - @Test() - void testSessionExpire() throws Exception { - IZkStateListener listener = new IZkStateListener() { - - @Override - public void handleStateChanged(KeeperState state) throws Exception { - System.out.println("In Old connection New state " + state); - } - - @Override - public void handleNewSession() throws Exception { - System.out.println("In Old connection New session"); - } - - @Override - public void handleSessionEstablishmentError(Throwable var1) throws Exception { - } - }; - - _zkClient.subscribeStateChanges(listener); - ZkConnection connection = ((ZkConnection) _zkClient.getConnection()); - ZooKeeper zookeeper = connection.getZookeeper(); - System.out.println("old sessionId= " + zookeeper.getSessionId()); - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - System.out.println("In New connection In process event:" + event); - } - }; - ZooKeeper newZookeeper = - new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher, - zookeeper.getSessionId(), zookeeper.getSessionPasswd()); - Thread.sleep(3000); - System.out.println("New sessionId= " + newZookeeper.getSessionId()); - Thread.sleep(3000); - newZookeeper.close(); - Thread.sleep(10000); - connection = ((ZkConnection) _zkClient.getConnection()); - zookeeper = connection.getZookeeper(); - System.out.println("After session expiry sessionId= " + zookeeper.getSessionId()); - } - - @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*") - void testDataSizeLimit() { - ZNRecord data = new ZNRecord(new String(new char[1024 * 1024])); - _zkClient.writeData("/test", data, -1); - } - - @Test - public void testZkClientMonitor() throws Exception { - final String TEST_TAG = "test_monitor"; - final String TEST_KEY = "test_key"; - final String TEST_DATA = "testData"; - final String TEST_ROOT = "/my_cluster/IDEALSTATES"; - final String TEST_NODE = "/test_zkclient_monitor"; - final String TEST_PATH = TEST_ROOT + TEST_NODE; - - ZkClient.Builder builder = new ZkClient.Builder(); - builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG) - .setMonitorRootPathOnly(false); - ZkClient zkClient = builder.build(); - - final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length; - - if (_zkClient.exists(TEST_PATH)) { - _zkClient.delete(TEST_PATH); - } - if (!_zkClient.exists(TEST_ROOT)) { - _zkClient.createPersistent(TEST_ROOT, true); - } - - MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); - - ObjectName name = MBeanRegistrar - .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, - TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY); - ObjectName rootname = MBeanRegistrar - .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, - TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH, - "Root"); - ObjectName idealStatename = MBeanRegistrar - .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE, - TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH, - "IdealStates"); - Assert.assertTrue(beanServer.isRegistered(rootname)); - Assert.assertTrue(beanServer.isRegistered(idealStatename)); - - // Test exists - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0); - zkClient.exists(TEST_ROOT); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0); - - // Test create - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"), - 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0); - zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), - TEST_DATA_SIZE); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), - TEST_DATA_SIZE); - long origWriteTotalLatencyCounter = - (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"); - Assert.assertTrue(origWriteTotalLatencyCounter >= 0); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0); - long origIdealStatesWriteTotalLatencyCounter = - (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"); - Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0); - Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0); - - // Test read - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0); - long origReadTotalLatencyCounter = - (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"); - long origIdealStatesReadTotalLatencyCounter = - (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter"); - Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0); - zkClient.readData(TEST_PATH, new Stat()); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2); - Assert - .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), - TEST_DATA_SIZE); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") - >= origReadTotalLatencyCounter); - Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter") - >= origIdealStatesReadTotalLatencyCounter); - Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0); - zkClient.getChildren(TEST_PATH); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3); - Assert - .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), - TEST_DATA_SIZE); - zkClient.getStat(TEST_PATH); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4); - Assert - .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), - TEST_DATA_SIZE); - zkClient.readDataAndStat(TEST_PATH, new Stat(), true); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5); - - ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler = - new ZkAsyncCallbacks.ExistsCallbackHandler(); - zkClient.asyncExists(TEST_PATH, callbackHandler); - callbackHandler.waitForSuccess(); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6); - - // Test write - zkClient.writeData(TEST_PATH, TEST_DATA); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2); - Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), - TEST_DATA_SIZE * 2); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2); - Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), - TEST_DATA_SIZE * 2); - Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter") - >= origWriteTotalLatencyCounter); - Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter") - >= origIdealStatesWriteTotalLatencyCounter); - - // Test data change count - final Lock lock = new ReentrantLock(); - final Condition callbackFinish = lock.newCondition(); - zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - lock.lock(); - try { - callbackFinish.signal(); - } finally { - lock.unlock(); - } - } - }); - lock.lock(); - _zkClient.delete(TEST_PATH); - Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS)); - Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java index 1f72948..691623e 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java @@ -32,6 +32,7 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.SystemPropertyKeys; import org.apache.helix.TestHelper; +import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.tools.ClusterSetup; import org.apache.zookeeper.WatchedEvent; @@ -94,7 +95,7 @@ public class TestZkReconnect { // 1. shutdown zkServer and check if handler trigger callback zkServer.shutdown(); // Simulate a retry in ZkClient that will not succeed - injectExpire(controller._zkclient); + injectExpire((ZkClient) controller._zkclient); Assert.assertFalse(controller._zkclient.waitUntilConnected(5000, TimeUnit.MILLISECONDS)); // While retrying, onDisconnectedFlag = false Assert.assertFalse(onDisconnectedFlag.get()); @@ -102,7 +103,7 @@ public class TestZkReconnect { // 2. restart zkServer and check if handler will recover connection zkServer.start(); Assert.assertTrue(controller._zkclient - .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)); + .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)); Assert.assertTrue(controller.isConnected()); // New propertyStore should be in good state http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java new file mode 100644 index 0000000..67e2731 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java @@ -0,0 +1,197 @@ +package org.apache.helix.manager.zk.client; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.TimeUnit; + +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.helix.HelixException; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestHelixZkClient extends ZkUnitTestBase { + final String TEST_NODE = "/test_helix_zkclient"; + + @Test public void testZkConnectionManager() { + final String TEST_ROOT = "/testZkConnectionManager/IDEALSTATES"; + final String TEST_PATH = TEST_ROOT + TEST_NODE; + + ZkConnectionManager zkConnectionManager = + new ZkConnectionManager(new ZkConnection(ZK_ADDR), HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, + null); + Assert.assertTrue(zkConnectionManager.waitUntilConnected(1, TimeUnit.SECONDS)); + + // This client can write/read from ZK + zkConnectionManager.createPersistent(TEST_PATH, true); + zkConnectionManager.writeData(TEST_PATH, "Test"); + Assert.assertTrue(zkConnectionManager.readData(TEST_PATH) != null); + zkConnectionManager.deleteRecursively(TEST_ROOT); + + // This client can be shared, and cannot close when sharing + SharedZkClient sharedZkClient = + new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null); + try { + zkConnectionManager.close(); + Assert.fail("Dedicated ZkClient cannot be closed while sharing!"); + } catch (HelixException hex) { + // expected + } + + // This client can be closed normally when sharing ends + sharedZkClient.close(); + Assert.assertTrue(sharedZkClient.isClosed()); + Assert.assertFalse(sharedZkClient.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + + zkConnectionManager.close(); + Assert.assertTrue(zkConnectionManager.isClosed()); + Assert.assertFalse(zkConnectionManager.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + + // Sharing a closed dedicated ZkClient shall fail + try { + new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null); + Assert.fail("Sharing a closed dedicated ZkClient shall fail."); + } catch (HelixException hex) { + // expected + } + } + + @Test(dependsOnMethods = "testZkConnectionManager") public void testSharingZkClient() + throws Exception { + final String TEST_ROOT = "/testSharedZkClient/IDEALSTATES"; + final String TEST_PATH = TEST_ROOT + TEST_NODE; + + // A factory just for this tests, this for avoiding the impact from other tests running in parallel. + final SharedZkClientFactory testFactory = new SharedZkClientFactory(); + + HelixZkClient.ZkConnectionConfig connectionConfig = + new HelixZkClient.ZkConnectionConfig(ZK_ADDR); + HelixZkClient sharedZkClientA = + testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig()); + Assert.assertTrue(sharedZkClientA.waitUntilConnected(1, TimeUnit.SECONDS)); + + HelixZkClient sharedZkClientB = + testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig()); + Assert.assertTrue(sharedZkClientB.waitUntilConnected(1, TimeUnit.SECONDS)); + + Assert.assertEquals(testFactory.getActiveConnectionCount(), 1); + + // client A and B is sharing the same session. + Assert.assertEquals(sharedZkClientA.getSessionId(), sharedZkClientB.getSessionId()); + long sessionId = sharedZkClientA.getSessionId(); + + final int[] notificationCountA = { 0, 0 }; + sharedZkClientA.subscribeDataChanges(TEST_PATH, new IZkDataListener() { + @Override public void handleDataChange(String s, Object o) { + notificationCountA[0]++; + } + + @Override public void handleDataDeleted(String s) { + notificationCountA[1]++; + } + }); + final int[] notificationCountB = { 0, 0 }; + sharedZkClientB.subscribeDataChanges(TEST_PATH, new IZkDataListener() { + @Override public void handleDataChange(String s, Object o) { + notificationCountB[0]++; + } + + @Override public void handleDataDeleted(String s) { + notificationCountB[1]++; + } + }); + + // Modify using client A and client B will get notification. + sharedZkClientA.createPersistent(TEST_PATH, true); + Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() { + return notificationCountB[0] == 1; + } + }, 1000)); + Assert.assertEquals(notificationCountB[1], 0); + + sharedZkClientA.deleteRecursively(TEST_ROOT); + Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() { + return notificationCountB[1] == 1; + } + }, 1000)); + Assert.assertEquals(notificationCountB[0], 1); + + try { + sharedZkClientA.createEphemeral(TEST_PATH, true); + Assert.fail("Create Ephemeral nodes using shared client should fail."); + } catch (HelixException he) { + // expected. + } + + sharedZkClientA.close(); + // Shared client A closed. + Assert.assertTrue(sharedZkClientA.isClosed()); + Assert.assertFalse(sharedZkClientA.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + // Shared client B still open. + Assert.assertFalse(sharedZkClientB.isClosed()); + Assert.assertTrue(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + + // client A cannot do any modify once closed. + try { + sharedZkClientA.createPersistent(TEST_PATH, true); + Assert.fail("Should not be able to create node with a closed client."); + } catch (Exception e) { + // expected to be here. + } + + // Now modify using client B, and client A won't get notification. + sharedZkClientB.createPersistent(TEST_PATH, true); + Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() { + return notificationCountB[0] == 2; + } + }, 1000)); + Assert.assertFalse(TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() { + return notificationCountA[0] == 2; + } + }, 1000)); + sharedZkClientB.deleteRecursively(TEST_ROOT); + + Assert.assertEquals(testFactory.getActiveConnectionCount(), 1); + + sharedZkClientB.close(); + // Shared client B closed. + Assert.assertTrue(sharedZkClientB.isClosed()); + Assert.assertFalse(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(testFactory.getActiveConnectionCount(), 0); + + // Try to create new shared ZkClient, will get a different session + HelixZkClient sharedZkClientC = + testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig()); + Assert.assertFalse(sessionId == sharedZkClientC.getSessionId()); + Assert.assertEquals(testFactory.getActiveConnectionCount(), 1); + + sharedZkClientC.close(); + // Shared client C closed. + Assert.assertTrue(sharedZkClientC.isClosed()); + Assert.assertFalse(sharedZkClientC.waitUntilConnected(100, TimeUnit.MILLISECONDS)); + Assert.assertEquals(testFactory.getActiveConnectionCount(), 0); + } +}
