This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 39bf0ad Deprecate Raw ZkClient in helix-core (#1070)
39bf0ad is described below
commit 39bf0ad429660797d70a0a1b53a2105e4ec1cd50
Author: Hunter Lee <[email protected]>
AuthorDate: Tue Jun 9 12:30:36 2020 -0700
Deprecate Raw ZkClient in helix-core (#1070)
We have moved the raw ZkClient to zookeeper-api. As such, we need to
deprecate the old one in helix-core. We are leaving the class for
backward-compatibility purposes.
---
.../java/org/apache/helix/manager/zk/ZkClient.java | 200 ++-------
.../src/test/java/org/apache/helix/TestHelper.java | 2 +-
.../org/apache/helix/manager/zk/TestZKWatch.java | 1 +
.../apache/helix/zookeeper/zkclient/ZkClient.java | 14 +-
.../apache/helix/zookeeper/impl/TestHelper.java | 133 ++++++
.../apache/helix/zookeeper/impl/ZkTestBase.java | 1 +
.../apache/helix/zookeeper/impl/ZkTestHelper.java | 446 +++++++++++++++++++++
.../zookeeper/impl/client}/TestRawZkClient.java | 78 ++--
8 files changed, 641 insertions(+), 234 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 4e38f26..e924d0e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -19,69 +19,34 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import org.apache.helix.HelixException;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.zkclient.IZkConnection;
-import org.apache.helix.zookeeper.zkclient.ZkConnection;
-import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/**
- * Raw ZkClient that wraps {@link
org.apache.helix.manager.zk.zookeeper.ZkClient},
- * with additional constructors and builder.
- *
- * Note that, instead of directly constructing a raw ZkClient, applications
should always use
- * HelixZkClientFactory to build shared or dedicated HelixZkClient instances.
- * Only constructing a raw ZkClient when advanced usage is required.
- * For example, application need to access/manage ZkConnection directly.
- *
- * Both SharedZKClient and DedicatedZkClient are built based on the raw
ZkClient. As shown below.
- * ----------------------------
- * | |
- * --------------------- |
- * | | | *implements
- * SharedZkClient DedicatedZkClient ----> HelixZkClient Interface
- * | | |
- * --------------------- |
- * | |
- * Raw ZkClient (this class)--------
- * |
- * Native ZkClient
- *
- * TODO Completely replace usage of the raw ZkClient within helix-core.
Instead, using HelixZkClient. --JJ
+ * Do NOT use; deprecated and replaced by
org.apache.helix.zookeeper.impl.client.ZkClient.
*/
-
-public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient
implements HelixZkClient {
- private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
-
- public static final int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
- public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
- public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
-
+@Deprecated
+public class ZkClient extends org.apache.helix.zookeeper.impl.client.ZkClient {
/**
- *
- * @param zkConnection
+ * @param zkConnection
* The Zookeeper connection
* @param connectionTimeout
* The connection timeout in milli seconds
- * @param zkSerializer
- * The Zookeeper data serializer
* @param operationRetryTimeout
* Most operations are retried in cases like connection loss with
the Zookeeper servers. During such failures, this
* <code>operationRetryTimeout</code> decides the maximum amount
of time, in milli seconds, each
* operation is retried. A value lesser than 0 is considered as
* "retry forever until a connection has been reestablished".
+ * @param zkSerializer
+ * The Zookeeper data serializer
* @param monitorType
* @param monitorKey
* @param monitorInstanceName
- * These 3 inputs are used to name JMX monitor bean name for this
ZkClient.
+ * These 3 inputs are used to name JMX monitor bean name for this
RealmAwareZkClient.
* The JMX bean name will be:
HelixZkClient.monitorType.monitorKey.monitorInstanceName.
* @param monitorRootPathOnly
- * Should only stat of access to root path be reported to JMX
bean or path-specific stat be reported too.
*/
public ZkClient(IZkConnection zkConnection, int connectionTimeout, long
operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey,
@@ -93,179 +58,70 @@ public class ZkClient extends
org.apache.helix.manager.zk.zookeeper.ZkClient imp
public ZkClient(IZkConnection connection, int connectionTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey,
long operationRetryTimeout) {
- this(connection, connectionTimeout, operationRetryTimeout, zkSerializer,
monitorType,
- monitorKey, null, true);
+ super(connection, connectionTimeout, zkSerializer, monitorType, monitorKey,
+ operationRetryTimeout);
}
public ZkClient(IZkConnection connection, int connectionTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey) {
- this(connection, connectionTimeout, zkSerializer, monitorType, monitorKey,
DEFAULT_OPERATION_TIMEOUT);
+ super(connection, connectionTimeout, zkSerializer, monitorType,
monitorKey);
}
public ZkClient(String zkServers, String monitorType, String monitorKey) {
- this(new ZkConnection(zkServers, DEFAULT_SESSION_TIMEOUT),
Integer.MAX_VALUE,
- new BasicZkSerializer(new SerializableSerializer()), monitorType,
monitorKey);
+ super(zkServers, monitorType, monitorKey);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String
monitorKey) {
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout,
zkSerializer, monitorType,
- monitorKey);
+ super(zkServers, sessionTimeout, connectionTimeout, zkSerializer,
monitorType, monitorKey);
}
public ZkClient(IZkConnection connection, int connectionTimeout,
PathBasedZkSerializer zkSerializer) {
- this(connection, connectionTimeout, zkSerializer, null, null);
+ super(connection, connectionTimeout, zkSerializer);
}
public ZkClient(IZkConnection connection, int connectionTimeout,
ZkSerializer zkSerializer) {
- this(connection, connectionTimeout, new BasicZkSerializer(zkSerializer));
+ super(connection, connectionTimeout, zkSerializer);
}
public ZkClient(IZkConnection connection, int connectionTimeout) {
- this(connection, connectionTimeout, new SerializableSerializer());
+ super(connection, connectionTimeout);
}
public ZkClient(IZkConnection connection) {
- this(connection, Integer.MAX_VALUE, new SerializableSerializer());
+ super(connection);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
ZkSerializer zkSerializer) {
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout,
zkSerializer);
+ super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
PathBasedZkSerializer zkSerializer) {
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout,
zkSerializer);
+ super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
}
public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout)
{
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout,
- new SerializableSerializer());
+ super(zkServers, sessionTimeout, connectionTimeout);
}
public ZkClient(String zkServers, int connectionTimeout) {
- this(new ZkConnection(zkServers, DEFAULT_SESSION_TIMEOUT),
connectionTimeout,
- new SerializableSerializer());
+ super(zkServers, connectionTimeout);
}
public ZkClient(String zkServers) {
- this(zkServers, null, null);
+ super(zkServers);
}
- public ZkClient(final String zkServers, final int sessionTimeout, final int
connectionTimeout,
- final ZkSerializer zkSerializer, final long operationRetryTimeout) {
- this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout,
zkSerializer,
- operationRetryTimeout);
- }
-
- public ZkClient(final IZkConnection zkConnection, final int
connectionTimeout,
- final ZkSerializer zkSerializer, final long operationRetryTimeout) {
- this(zkConnection, connectionTimeout, operationRetryTimeout,
- new BasicZkSerializer(zkSerializer), null, null, null, false);
+ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
+ ZkSerializer zkSerializer, long operationRetryTimeout) {
+ super(zkServers, sessionTimeout, connectionTimeout, zkSerializer,
operationRetryTimeout);
}
- public static class Builder {
- IZkConnection _connection;
- String _zkServer;
-
- PathBasedZkSerializer _zkSerializer;
-
- long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
- int _connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
- int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
-
- String _monitorType;
- String _monitorKey;
- String _monitorInstanceName = null;
- boolean _monitorRootPathOnly = true;
-
- public Builder setConnection(IZkConnection connection) {
- this._connection = connection;
- return this;
- }
-
- public Builder setConnectionTimeout(Integer connectionTimeout) {
- this._connectionTimeout = connectionTimeout;
- return this;
- }
-
- public Builder setZkSerializer(
- PathBasedZkSerializer zkSerializer) {
- this._zkSerializer = zkSerializer;
- return this;
- }
-
- public Builder setZkSerializer(ZkSerializer zkSerializer) {
- this._zkSerializer = new BasicZkSerializer(zkSerializer);
- return this;
- }
-
- /**
- * Used as part of the MBean ObjectName. This item is required for
enabling monitoring.
- * @param monitorType
- */
- public Builder setMonitorType(String monitorType) {
- this._monitorType = monitorType;
- return this;
- }
-
- /**
- * Used as part of the MBean ObjectName. This item is required for
enabling monitoring.
- * @param monitorKey
- */
- public Builder setMonitorKey(String monitorKey) {
- this._monitorKey = monitorKey;
- return this;
- }
-
- /**
- * Used as part of the MBean ObjectName. This item is optional.
- * @param instanceName
- */
- public Builder setMonitorInstanceName(String instanceName) {
- this._monitorInstanceName = instanceName;
- return this;
- }
-
-
- public Builder setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
- this._monitorRootPathOnly = monitorRootPathOnly;
- return this;
- }
-
- public Builder setZkServer(String zkServer) {
- this._zkServer = zkServer;
- return this;
- }
-
- public Builder setSessionTimeout(Integer sessionTimeout) {
- this._sessionTimeout = sessionTimeout;
- return this;
- }
-
- public Builder setOperationRetryTimeout(Long operationRetryTimeout) {
- this._operationRetryTimeout = operationRetryTimeout;
- return this;
- }
-
- public ZkClient build() {
- if (_connection == null) {
- if (_zkServer == null) {
- throw new HelixException(
- "Failed to build ZkClient since no connection or ZK server
address is specified.");
- } else {
- _connection = new ZkConnection(_zkServer, _sessionTimeout);
- }
- }
-
- if (_zkSerializer == null) {
- _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
- }
-
- return new ZkClient(_connection, _connectionTimeout,
_operationRetryTimeout, _zkSerializer,
- _monitorType, _monitorKey, _monitorInstanceName,
_monitorRootPathOnly);
- }
+ public ZkClient(IZkConnection zkConnection, int connectionTimeout,
ZkSerializer zkSerializer,
+ long operationRetryTimeout) {
+ super(zkConnection, connectionTimeout, zkSerializer,
operationRetryTimeout);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java
b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index adb4812..18d8aae 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -785,7 +785,7 @@ public class TestHelper {
return sb.toString();
}
- public static interface Verifier {
+ public interface Verifier {
boolean verify() throws Exception;
}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
index 1b6d543..c8db975 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKWatch.java
@@ -27,6 +27,7 @@ import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 89f9e32..bc067c3 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -676,7 +676,7 @@ public class ZkClient implements Watcher {
if (isSessionAwareOperation(expectedSessionId, mode)) {
acquireEventLock();
try {
- final String actualSessionId =
toHexSessionId(zooKeeper.getSessionId());
+ final String actualSessionId =
Long.toHexString(zooKeeper.getSessionId());
if (!actualSessionId.equals(expectedSessionId)) {
throw new ZkSessionMismatchedException(
"Failed to create ephemeral node! There is a session id
mismatch. Expected: "
@@ -2110,7 +2110,7 @@ public class ZkClient implements Watcher {
* Ex. 1000a5ceb930004 is returned.
*/
private String getHexSessionId() {
- return toHexSessionId(getSessionId());
+ return Long.toHexString(getSessionId());
}
/*
@@ -2216,14 +2216,4 @@ public class ZkClient implements Watcher {
throw new IllegalArgumentException("Must not be done in the zookeeper
event thread.");
}
}
-
- /**
- * Converts a session id in hexadecimal notation from a long type session id.
- * Ex. 1000a5ceb930004 is returned.
- *
- * @return String representation of session id in hexadecimal notation.
- */
- private static String toHexSessionId(long sessionId) {
- return Long.toHexString(sessionId);
- }
}
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
new file mode 100644
index 0000000..9a6ac02
--- /dev/null
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
@@ -0,0 +1,133 @@
+package org.apache.helix.zookeeper.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.ServerSocket;
+import java.util.Arrays;
+
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+
+public class TestHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
+ public static final long WAIT_DURATION = 20 * 1000L; // 20 seconds
+
+ /**
+ * Returns a unused random port.
+ */
+ public static int getRandomPort()
+ throws IOException {
+ ServerSocket sock = new ServerSocket();
+ sock.bind(null);
+ int port = sock.getLocalPort();
+ sock.close();
+
+ return port;
+ }
+
+ static public void stopZkServer(ZkServer zkServer) {
+ if (zkServer != null) {
+ zkServer.shutdown();
+ System.out.println(
+ "Shut down zookeeper at port " + zkServer.getPort() + " in thread "
+ Thread
+ .currentThread().getName());
+ }
+ }
+
+ /**
+ * generic method for verification with a timeout
+ * @param verifierName
+ * @param args
+ */
+ public static void verifyWithTimeout(String verifierName, long timeout,
Object... args) {
+ final long sleepInterval = 1000; // in ms
+ final int loop = (int) (timeout / sleepInterval) + 1;
+ try {
+ boolean result = false;
+ int i = 0;
+ for (; i < loop; i++) {
+ Thread.sleep(sleepInterval);
+ // verifier should be static method
+ result = (Boolean) TestHelper.getMethod(verifierName).invoke(null,
args);
+
+ if (result) {
+ break;
+ }
+ }
+
+ System.err.println(
+ verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " ("
+ result + ")");
+ LOG.debug("args:" + Arrays.toString(args));
+ // System.err.println("args:" + Arrays.toString(args));
+
+ if (!result) {
+ LOG.error(verifierName + " fails");
+ LOG.error("args:" + Arrays.toString(args));
+ }
+
+ Assert.assertTrue(result);
+ } catch (Exception e) {
+ LOG.error("Exception in verify: " + verifierName, e);
+ }
+ }
+
+ private static Method getMethod(String name) {
+ Method[] methods = TestHelper.class.getMethods();
+ for (Method method : methods) {
+ if (name.equals(method.getName())) {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ public static String getTestMethodName() {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ return calls[2].getMethodName();
+ }
+
+ public static String getTestClassName() {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ String fullClassName = calls[2].getClassName();
+ return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+ }
+
+ public interface Verifier {
+ boolean verify()
+ throws Exception;
+ }
+
+ public static boolean verify(Verifier verifier, long timeout)
+ throws Exception {
+ long start = System.currentTimeMillis();
+ do {
+ boolean result = verifier.verify();
+ if (result || (System.currentTimeMillis() - start) > timeout) {
+ return result;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+}
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
index 2b8b1b3..eff5610 100644
---
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -53,6 +53,7 @@ public class ZkTestBase {
public static final String ZK_PREFIX = TestConstants.ZK_PREFIX;
public static final int ZK_START_PORT = TestConstants.ZK_START_PORT;
+ public static final String ZK_ADDR = ZK_PREFIX + ZK_START_PORT;
/*
* Multiple ZK references
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestHelper.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestHelper.java
new file mode 100644
index 0000000..32e2b8a
--- /dev/null
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestHelper.java
@@ -0,0 +1,446 @@
+package org.apache.helix.zookeeper.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+
+public class ZkTestHelper {
+ private static Logger LOG = LoggerFactory.getLogger(ZkTestHelper.class);
+ private static ExecutorService _executor =
Executors.newSingleThreadExecutor();
+
+ /**
+ * Simulate a zk state change by calling {@link
ZkClient#process(WatchedEvent)} directly
+ */
+ public static void simulateZkStateReconnected(RealmAwareZkClient client) {
+ ZkClient zkClient = (ZkClient) client;
+ WatchedEvent event = new WatchedEvent(EventType.None,
KeeperState.Disconnected, null);
+ zkClient.process(event);
+ event = new WatchedEvent(EventType.None, KeeperState.SyncConnected, null);
+ zkClient.process(event);
+ }
+
+ /**
+ * Get zk connection session id
+ * @param client
+ * @return
+ */
+ public static String getSessionId(RealmAwareZkClient client) {
+ ZkConnection connection = (ZkConnection) ((ZkClient)
client).getConnection();
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ return Long.toHexString(curZookeeper.getSessionId());
+ }
+
+ /**
+ * Expire current zk session and wait for {@link
IZkStateListener#handleNewSession(String)} invoked
+ * @param client
+ * @throws Exception
+ */
+ public static void disconnectSession(HelixZkClient client)
+ throws Exception {
+ final ZkClient zkClient = (ZkClient) client;
+ IZkStateListener listener = new IZkStateListener() {
+ @Override
+ public void handleStateChanged(KeeperState state)
+ throws Exception {
+ // System.err.println("disconnectSession handleStateChanged. state: "
+ state);
+ }
+
+ @Override
+ public void handleNewSession(final String sessionId)
+ throws Exception {
+ // make sure zkclient is connected again
+ zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
TimeUnit.SECONDS);
+
+ LOG.info("handleNewSession. sessionId: {}.", sessionId);
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable var1)
+ throws Exception {
+ }
+ };
+
+ zkClient.subscribeStateChanges(listener);
+ ZkConnection connection = (ZkConnection) zkClient.getConnection();
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ LOG.info("Before expiry. sessionId: " +
Long.toHexString(curZookeeper.getSessionId()));
+
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Process watchEvent: " + event);
+ }
+ };
+
+ final ZooKeeper dupZookeeper =
+ new ZooKeeper(connection.getServers(),
curZookeeper.getSessionTimeout(), watcher,
+ curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
+ // wait until connected, then close
+ while (dupZookeeper.getState() != States.CONNECTED) {
+ Thread.sleep(10);
+ }
+ dupZookeeper.close();
+
+ connection = (ZkConnection) zkClient.getConnection();
+ curZookeeper = connection.getZookeeper();
+ zkClient.unsubscribeStateChanges(listener);
+
+ // System.err.println("zk: " + oldZookeeper);
+ LOG.info("After expiry. sessionId: " +
Long.toHexString(curZookeeper.getSessionId()));
+ }
+
+ public static void expireSession(RealmAwareZkClient client)
+ throws Exception {
+ final CountDownLatch waitNewSession = new CountDownLatch(1);
+ final ZkClient zkClient = (ZkClient) client;
+
+ IZkStateListener listener = new IZkStateListener() {
+ @Override
+ public void handleStateChanged(KeeperState state)
+ throws Exception {
+ LOG.info("IZkStateListener#handleStateChanged, state: " + state);
+ }
+
+ @Override
+ public void handleNewSession(final String sessionId)
+ throws Exception {
+ // make sure zkclient is connected again
+ zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
TimeUnit.SECONDS);
+
+ LOG.info("handleNewSession. sessionId: {}.", sessionId);
+ waitNewSession.countDown();
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable var1)
+ throws Exception {
+ }
+ };
+
+ zkClient.subscribeStateChanges(listener);
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+ LOG.info("Before session expiry. sessionId: " + oldSessionId + ", zk: " +
curZookeeper);
+
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Watcher#process, event: " + event);
+ }
+ };
+
+ final ZooKeeper dupZookeeper =
+ new ZooKeeper(connection.getServers(),
curZookeeper.getSessionTimeout(), watcher,
+ curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
+ // wait until connected, then close
+ while (dupZookeeper.getState() != States.CONNECTED) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(dupZookeeper.getState(), States.CONNECTED,
+ "Fail to connect to zk using current session info");
+ dupZookeeper.close();
+
+ // make sure session expiry really happens
+ waitNewSession.await();
+ zkClient.unsubscribeStateChanges(listener);
+
+ connection = (ZkConnection) zkClient.getConnection();
+ curZookeeper = connection.getZookeeper();
+
+ String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+ LOG.info("After session expiry. sessionId: " + newSessionId + ", zk: " +
curZookeeper);
+ Assert.assertFalse(newSessionId.equals(oldSessionId),
+ "Fail to expire current session, zk: " + curZookeeper);
+ }
+
+ /**
+ * expire zk session asynchronously
+ * @param client
+ * @throws Exception
+ */
+ public static void asyncExpireSession(RealmAwareZkClient client)
+ throws Exception {
+ final ZkClient zkClient = (ZkClient) client;
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ LOG.info("Before expiry. sessionId: " +
Long.toHexString(curZookeeper.getSessionId()));
+
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Process watchEvent: " + event);
+ }
+ };
+
+ final ZooKeeper dupZookeeper =
+ new ZooKeeper(connection.getServers(),
curZookeeper.getSessionTimeout(), watcher,
+ curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
+ // wait until connected, then close
+ while (dupZookeeper.getState() != States.CONNECTED) {
+ Thread.sleep(10);
+ }
+ dupZookeeper.close();
+
+ connection = (ZkConnection) zkClient.getConnection();
+ curZookeeper = connection.getZookeeper();
+
+ // System.err.println("zk: " + oldZookeeper);
+ LOG.info("After expiry. sessionId: " +
Long.toHexString(curZookeeper.getSessionId()));
+ }
+
+ /**
+ * return the number of listeners on given zk-path
+ * @param zkAddr
+ * @param path
+ * @return
+ * @throws Exception
+ */
+ public static int numberOfListeners(String zkAddr, String path)
+ throws Exception {
+ Map<String, Set<String>> listenerMap = getListenersByZkPath(zkAddr);
+ if (listenerMap.containsKey(path)) {
+ return listenerMap.get(path).size();
+ }
+ return 0;
+ }
+
+ /**
+ * return a map from zk-path to a set of zk-session-id that put watches on
the zk-path
+ * @param zkAddr
+ * @return
+ * @throws Exception
+ */
+ public static Map<String, Set<String>> getListenersByZkPath(String zkAddr)
+ throws Exception {
+ String splits[] = zkAddr.split(":");
+ Map<String, Set<String>> listenerMap = new TreeMap<>();
+ Socket sock = null;
+ int retry = 5;
+
+ while (retry > 0) {
+ try {
+ sock = new Socket(splits[0], Integer.parseInt(splits[1]));
+ PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
+ BufferedReader in = new BufferedReader(new
InputStreamReader(sock.getInputStream()));
+
+ out.println("wchp");
+
+ listenerMap.clear();
+ String lastPath = null;
+ String line = in.readLine();
+ while (line != null) {
+ line = line.trim();
+
+ if (line.startsWith("/")) {
+ lastPath = line;
+ if (!listenerMap.containsKey(lastPath)) {
+ listenerMap.put(lastPath, new TreeSet<String>());
+ }
+ } else if (line.startsWith("0x")) {
+ if (lastPath != null && listenerMap.containsKey(lastPath)) {
+ listenerMap.get(lastPath).add(line);
+ } else {
+ LOG.error("Not path associated with listener sessionId: " + line
+ ", lastPath: "
+ + lastPath);
+ }
+ } else {
+ // LOG.error("unrecognized line: " + line);
+ }
+ line = in.readLine();
+ }
+ break;
+ } catch (Exception e) {
+ // sometimes in test, we see connection-reset exceptions when
in.readLine()
+ // so add this retry logic
+ retry--;
+ } finally {
+ if (sock != null) {
+ sock.close();
+ }
+ }
+ }
+ return listenerMap;
+ }
+
+ /**
+ * return a map from session-id to a set of zk-path that the session has
watches on
+ * @return
+ */
+ public static Map<String, Set<String>> getListenersBySession(String zkAddr)
+ throws Exception {
+ Map<String, Set<String>> listenerMapByInstance =
getListenersByZkPath(zkAddr);
+
+ // convert to index by sessionId
+ Map<String, Set<String>> listenerMapBySession = new TreeMap<>();
+ for (String path : listenerMapByInstance.keySet()) {
+ for (String sessionId : listenerMapByInstance.get(path)) {
+ if (!listenerMapBySession.containsKey(sessionId)) {
+ listenerMapBySession.put(sessionId, new TreeSet<String>());
+ }
+ listenerMapBySession.get(sessionId).add(path);
+ }
+ }
+
+ return listenerMapBySession;
+ }
+
+ static java.lang.reflect.Field getField(Class clazz, String fieldName)
+ throws NoSuchFieldException {
+ try {
+ return clazz.getDeclaredField(fieldName);
+ } catch (NoSuchFieldException e) {
+ Class superClass = clazz.getSuperclass();
+ if (superClass == null) {
+ throw e;
+ } else {
+ return getField(superClass, fieldName);
+ }
+ }
+ }
+
+ public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client)
+ throws Exception {
+ Map<String, List<String>> lists = new HashMap<String, List<String>>();
+ ZkClient zkClient = (ZkClient) client;
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper zk = connection.getZookeeper();
+
+ java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
+ field.setAccessible(true);
+ Object watchManager = field.get(zk);
+
+ java.lang.reflect.Field field2 = getField(watchManager.getClass(),
"dataWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> dataWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "existWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> existWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ field2 = getField(watchManager.getClass(), "childWatches");
+ field2.setAccessible(true);
+ HashMap<String, Set<Watcher>> childWatches =
+ (HashMap<String, Set<Watcher>>) field2.get(watchManager);
+
+ lists.put("dataWatches", new ArrayList<>(dataWatches.keySet()));
+ lists.put("existWatches", new ArrayList<>(existWatches.keySet()));
+ lists.put("childWatches", new ArrayList<>(childWatches.keySet()));
+
+ return lists;
+ }
+
+ public static Map<String, Set<IZkDataListener>>
getZkDataListener(RealmAwareZkClient client)
+ throws Exception {
+ java.lang.reflect.Field field = getField(client.getClass(),
"_dataListener");
+ field.setAccessible(true);
+ Map<String, Set<IZkDataListener>> dataListener =
+ (Map<String, Set<IZkDataListener>>) field.get(client);
+ return dataListener;
+ }
+
+ public static Map<String, Set<IZkChildListener>>
getZkChildListener(RealmAwareZkClient client)
+ throws Exception {
+ java.lang.reflect.Field field = getField(client.getClass(),
"_childListener");
+ field.setAccessible(true);
+ Map<String, Set<IZkChildListener>> childListener =
+ (Map<String, Set<IZkChildListener>>) field.get(client);
+ return childListener;
+ }
+
+ public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient)
+ throws Exception {
+ java.lang.reflect.Field field = getField(zkclient.getClass(),
"_eventThread");
+ field.setAccessible(true);
+ Object eventThread = field.get(zkclient);
+ // System.out.println("field: " + eventThread);
+
+ java.lang.reflect.Field field2 = getField(eventThread.getClass(),
"_events");
+ field2.setAccessible(true);
+ BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
+ // System.out.println("field2: " + queue + ", " + queue.size());
+
+ if (queue == null) {
+ LOG.error("fail to get event-queue from zkclient. skip waiting");
+ return false;
+ }
+
+ for (int i = 0; i < 20; i++) {
+ if (queue.size() == 0) {
+ return true;
+ }
+ Thread.sleep(100);
+ System.out.println("pending zk-events in queue: " + queue);
+ }
+ return false;
+ }
+
+ public static void injectExpire(RealmAwareZkClient client)
+ throws ExecutionException, InterruptedException {
+ final ZkClient zkClient = (ZkClient) client;
+ Future future = _executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ WatchedEvent event = new WatchedEvent(EventType.None,
KeeperState.Expired, null);
+ zkClient.process(event);
+ }
+ });
+ future.get();
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
similarity index 93%
rename from
helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
rename to
zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index 5771003..2388ed7 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -1,4 +1,4 @@
-package org.apache.helix.manager.zk;
+package org.apache.helix.zookeeper.impl.client;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -34,23 +34,24 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.helix.HelixException;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkTestHelper;
-import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
-import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
-import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.impl.TestHelper;
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.impl.ZkTestHelper;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import
org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientPathMonitor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -65,7 +66,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestRawZkClient extends ZkUnitTestBase {
+public class TestRawZkClient extends ZkTestBase {
private final String TEST_TAG = "test_monitor";
private final String TEST_ROOT = "/my_cluster/IDEALSTATES";
@@ -73,7 +74,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
@BeforeClass
public void beforeClass() {
- _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
}
@AfterClass
@@ -265,7 +266,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
throws Exception {
final String TEST_KEY = "testZkClientMonitor";
ZkClient.Builder builder = new ZkClient.Builder();
-
builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
+
builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
.setMonitorRootPathOnly(false);
ZkClient zkClient = builder.build();
@@ -452,7 +453,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
final int zkPort = TestHelper.getRandomPort();
final String zkAddr = String.format("localhost:%d", zkPort);
- final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+ final ZkServer zkServer = startZkServer(zkAddr);
try {
ZkClient.Builder builder = new ZkClient.Builder();
@@ -492,21 +493,9 @@ public class TestRawZkClient extends ZkUnitTestBase {
@Test
public void testCreateEphemeralWithValidSession()
throws Exception {
- final String className = TestHelper.getTestClassName();
- final String methodName = TestHelper.getTestMethodName();
- final String clusterName = className + "_" + methodName;
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave", true); // do rebalance
-
- final String originalSessionId =
ZKUtil.toHexSessionId(_zkClient.getSessionId());
- final String path = "/" + methodName;
+ final String originalSessionId =
Long.toHexString(_zkClient.getSessionId());
+ final String path = "/" + TestHelper.getTestMethodName();
final String data = "Hello Helix";
// Verify the node is not existed yet.
@@ -531,7 +520,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
Assert.assertEquals(nodeData, data, "Data is not correct.");
Assert.assertTrue(stat.getEphemeralOwner() != 0L,
"Ephemeral owner should NOT be zero because the node is an ephemeral
node.");
- Assert.assertEquals(ZKUtil.toHexSessionId(stat.getEphemeralOwner()),
originalSessionId,
+ Assert.assertEquals(Long.toHexString(stat.getEphemeralOwner()),
originalSessionId,
"Ephemeral node is created by an unexpected session");
// Delete the node to clean up, otherwise, the ephemeral node would be
existed
@@ -555,17 +544,8 @@ public class TestRawZkClient extends ZkUnitTestBase {
final String methodName = TestHelper.getTestMethodName();
final String clusterName = className + "_" + methodName;
- TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
- "localhost", // participant name prefix
- "TestDB", // resource name prefix
- 1, // resources
- 10, // partitions per resource
- 5, // number of nodes
- 3, // replicas
- "MasterSlave", true); // do rebalance
-
final long originalSessionId = _zkClient.getSessionId();
- final String originalHexSessionId =
ZKUtil.toHexSessionId(originalSessionId);
+ final String originalHexSessionId = Long.toHexString(originalSessionId);
final String path = "/" + methodName;
// Verify the node is not existed.
@@ -579,7 +559,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
try {
// New session id should not equal to expired session id.
return _zkClient.getSessionId() != originalSessionId;
- } catch (HelixException ex) {
+ } catch (ZkClientException ex) {
return false;
}
}, 1000L));
@@ -612,17 +592,17 @@ public class TestRawZkClient extends ZkUnitTestBase {
final String methodName = TestHelper.getTestMethodName();
final ZkClient zkClient =
- new
ZkClient.Builder().setZkServer(ZK_ADDR).setOperationRetryTimeout(3000L) // 3
seconds
+ new
ZkClient.Builder().setZkServer(ZkTestBase.ZK_ADDR).setOperationRetryTimeout(3000L)
// 3 seconds
.build();
- final String expectedSessionId =
ZKUtil.toHexSessionId(zkClient.getSessionId());
+ final String expectedSessionId = Long.toHexString(zkClient.getSessionId());
final String path = "/" + methodName;
final String data = "data";
Assert.assertFalse(zkClient.exists(path));
// Shutdown zk server so zk operations will fail due to disconnection.
- TestHelper.stopZkServer(_zkServer);
+ TestHelper.stopZkServer(_zkServerMap.get(ZK_ADDR));
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -655,7 +635,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
} finally {
zkClient.close();
// Recover zk server.
- _zkServer.start();
+ _zkServerMap.get(ZK_ADDR).start();
}
}
@@ -675,14 +655,14 @@ public class TestRawZkClient extends ZkUnitTestBase {
throws Exception {
final String methodName = TestHelper.getTestMethodName();
- final String expectedSessionId =
ZKUtil.toHexSessionId(_zkClient.getSessionId());
+ final String expectedSessionId =
Long.toHexString(_zkClient.getSessionId());
final String path = "/" + methodName;
final String data = "data";
Assert.assertFalse(_zkClient.exists(path));
// Shutdown zk server so zk operations will fail due to disconnection.
- TestHelper.stopZkServer(_zkServer);
+ TestHelper.stopZkServer(_zkServerMap.get(ZK_ADDR));
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean running = new AtomicBoolean(true);
@@ -704,7 +684,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
TimeUnit.SECONDS.sleep(10);
System.out.println("Restarting zk server...");
- _zkServer.start();
+ _zkServerMap.get(ZK_ADDR).start();
// Wait for creating ephemeral node successfully.
final boolean creationThreadTerminated = countDownLatch.await(10,
TimeUnit.SECONDS);
@@ -722,7 +702,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
Assert.assertEquals(nodeData, data, "Data is not correct.");
Assert.assertTrue(stat.getEphemeralOwner() != 0L,
"Ephemeral owner should NOT be zero because the node is an ephemeral
node.");
- Assert.assertEquals(ZKUtil.toHexSessionId(stat.getEphemeralOwner()),
expectedSessionId,
+ Assert.assertEquals(Long.toHexString(stat.getEphemeralOwner()),
expectedSessionId,
"Ephemeral node is created by an unexpected session");
// Delete the node to clean up, otherwise, the ephemeral node would be
existed until the session
@@ -732,9 +712,9 @@ public class TestRawZkClient extends ZkUnitTestBase {
@Test
public void testWaitForEstablishedSession() {
- ZkClient zkClient = new ZkClient(ZK_ADDR);
+ ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
Assert.assertTrue(zkClient.waitForEstablishedSession(1, TimeUnit.SECONDS)
!= 0L);
- TestHelper.stopZkServer(_zkServer);
+ TestHelper.stopZkServer(_zkServerMap.get(ZK_ADDR));
Assert.assertTrue(zkClient.waitForKeeperState(KeeperState.Disconnected, 1,
TimeUnit.SECONDS));
try {
@@ -747,12 +727,12 @@ public class TestRawZkClient extends ZkUnitTestBase {
zkClient.close();
// Recover zk server for later tests.
- _zkServer.start();
+ _zkServerMap.get(ZK_ADDR).start();
}
@Test
public void testAsyncWriteOperations() {
- ZkClient zkClient = new ZkClient(ZK_ADDR);
+ ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
String originSizeLimit =
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
"2000");