Repository: zookeeper Updated Branches: refs/heads/master 75c652f45 -> 9f8279841
ZOOKEEPER-2251: Add Client side packet response timeout to avoid infinite wait. Add Client side packet response timeout to avoid infinite wait. Author: Mohammad Arshad <ars...@apache.org> Reviewers: Michael Han <h...@apache.org>, nrico Olivelli <eolive...@gmail.com> Closes #119 from arshadmohammad/ZOOKEEPER-2251 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/9f827984 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/9f827984 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/9f827984 Branch: refs/heads/master Commit: 9f82798415351a20136ceb1640b1781723e51cc1 Parents: 75c652f Author: Mohammad Arshad <ars...@apache.org> Authored: Thu Jul 26 20:16:04 2018 -0700 Committer: Michael Han <h...@apache.org> Committed: Thu Jul 26 20:16:04 2018 -0700 ---------------------------------------------------------------------- .../main/org/apache/zookeeper/ClientCnxn.java | 78 +++++++-- .../org/apache/zookeeper/KeeperException.java | 13 ++ .../main/org/apache/zookeeper/ZooKeeper.java | 11 +- .../apache/zookeeper/client/ZKClientConfig.java | 37 +++++ .../zookeeper/ClientRequestTimeoutTest.java | 165 +++++++++++++++++++ .../content/xdocs/zookeeperProgrammers.xml | 20 +++ 6 files changed, 311 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/ClientCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java index ba601bc..b28c980 100644 --- a/src/java/main/org/apache/zookeeper/ClientCnxn.java +++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java @@ -201,6 +201,11 @@ public class ClientCnxn { public ZooKeeperSaslClient zooKeeperSaslClient; private final ZKClientConfig clientConfig; + /** + * If any request's response in not received in configured requestTimeout + * then it is assumed that the response packet is lost. + */ + private long requestTimeout; public long getSessionId() { return sessionId; @@ -395,6 +400,7 @@ public class ClientCnxn { sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); this.clientConfig=zooKeeper.getClientConfig(); + initRequestTimeout(); } public void start() { @@ -671,7 +677,8 @@ public class ClientCnxn { } } - private void finishPacket(Packet p) { + // @VisibleForTesting + protected void finishPacket(Packet p) { int err = p.replyHeader.getErr(); if (p.watchRegistration != null) { p.watchRegistration.register(err); @@ -1246,15 +1253,7 @@ public class ClientCnxn { } // At this point, there might still be new packets appended to outgoingQueue. // they will be handled in next connection or cleared up if closed. - cleanup(); - if (state.isAlive()) { - eventThread.queueEvent(new WatchedEvent( - Event.EventType.None, - Event.KeeperState.Disconnected, - null)); - } - clientCnxnSocket.updateNow(); - clientCnxnSocket.updateLastSendAndHeard(); + cleanAndNotifyState(); } } } @@ -1275,6 +1274,16 @@ public class ClientCnxn { + Long.toHexString(getSessionId())); } + private void cleanAndNotifyState() { + cleanup(); + if (state.isAlive()) { + eventThread.queueEvent(new WatchedEvent(Event.EventType.None, + Event.KeeperState.Disconnected, null)); + } + clientCnxnSocket.updateNow(); + clientCnxnSocket.updateLastSendAndHeard(); + } + private void pingRwServer() throws RWServerFoundException { String result = null; InetSocketAddress addr = hostProvider.next(0); @@ -1506,13 +1515,40 @@ public class ClientCnxn { Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { - while (!packet.finished) { - packet.wait(); + if (requestTimeout > 0) { + // Wait for request completion with timeout + waitForPacketFinish(r, packet); + } else { + // Wait for request completion infinitely + while (!packet.finished) { + packet.wait(); + } } } + if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) { + sendThread.cleanAndNotifyState(); + } return r; } + /** + * Wait for request completion with timeout. + */ + private void waitForPacketFinish(ReplyHeader r, Packet packet) + throws InterruptedException { + long waitStartTime = Time.currentElapsedTime(); + while (!packet.finished) { + packet.wait(requestTimeout); + if (!packet.finished && ((Time.currentElapsedTime() + - waitStartTime) >= requestTimeout)) { + LOG.error("Timeout error occurred for the packet '{}'.", + packet); + r.setErr(Code.REQUESTTIMEOUT.intValue()); + break; + } + } + } + public void saslCompleted() { sendThread.getClientCnxnSocket().saslCompleted(); } @@ -1603,4 +1639,22 @@ public class ClientCnxn { this.ctx = ctx; } } + + private void initRequestTimeout() { + try { + requestTimeout = clientConfig.getLong( + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT); + LOG.info("{} value is {}. feature enabled=", + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, + requestTimeout, requestTimeout > 0); + } catch (NumberFormatException e) { + LOG.error( + "Configured value {} for property {} can not be parsed to long.", + clientConfig.getProperty( + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT), + ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT); + throw e; + } + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/KeeperException.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/KeeperException.java b/src/java/main/org/apache/zookeeper/KeeperException.java index 143fac5..f797bb0 100644 --- a/src/java/main/org/apache/zookeeper/KeeperException.java +++ b/src/java/main/org/apache/zookeeper/KeeperException.java @@ -144,6 +144,8 @@ public abstract class KeeperException extends Exception { return new NoWatcherException(); case RECONFIGDISABLED: return new ReconfigDisabledException(); + case REQUESTTIMEOUT: + return new RequestTimeoutException(); case OK: default: throw new IllegalArgumentException("Invalid exception code"); @@ -392,6 +394,8 @@ public abstract class KeeperException extends Exception { EPHEMERALONLOCALSESSION (EphemeralOnLocalSession), /** Attempts to remove a non-existing watcher */ NOWATCHER (-121), + /** Request not completed within max allowed time.*/ + REQUESTTIMEOUT (-122), /** Attempts to perform a reconfiguration operation when reconfiguration feature is disabled. */ RECONFIGDISABLED(-123); @@ -843,4 +847,13 @@ public abstract class KeeperException extends Exception { super(Code.RECONFIGDISABLED, path); } } + + /** + * @see Code#REQUESTTIMEOUT + */ + public static class RequestTimeoutException extends KeeperException { + public RequestTimeoutException() { + super(Code.REQUESTTIMEOUT); + } + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/ZooKeeper.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java index ebcc500..6cac98e 100644 --- a/src/java/main/org/apache/zookeeper/ZooKeeper.java +++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java @@ -876,12 +876,21 @@ public class ZooKeeper implements AutoCloseable { connectString); hostProvider = aHostProvider; - cnxn = new ClientCnxn(connectStringParser.getChrootPath(), + cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } + // @VisibleForTesting + protected ClientCnxn createConnection(String chrootPath, + HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, + ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, + watchManager, clientCnxnSocket, canBeReadOnly); + } + /** * To create a ZooKeeper client object, the application needs to pass a * connection string containing a comma separated list of host:port pairs, http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java index 3c10627..097f2f0 100644 --- a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java +++ b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java @@ -56,9 +56,15 @@ public class ZKClientConfig extends ZKConfig { @SuppressWarnings("deprecation") public static final String SECURE_CLIENT = ZooKeeper.SECURE_CLIENT; public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 4096 * 1024; /* 4 MB */ + public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout"; + /** + * Feature is disabled by default. + */ + public static final long ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT = 0; public ZKClientConfig() { super(); + initFromJavaSystemProperties(); } public ZKClientConfig(File configFile) throws ConfigException { @@ -69,6 +75,15 @@ public class ZKClientConfig extends ZKConfig { super(configPath); } + /** + * Initialize all the ZooKeeper client properties which are configurable as + * java system property + */ + private void initFromJavaSystemProperties() { + setProperty(ZOOKEEPER_REQUEST_TIMEOUT, + System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT)); + } + @Override protected void handleBackwardCompatibility() { /** @@ -100,4 +115,26 @@ public class ZKClientConfig extends ZKConfig { public boolean isSaslClientEnabled() { return Boolean.valueOf(getProperty(ENABLE_CLIENT_SASL_KEY, ENABLE_CLIENT_SASL_DEFAULT)); } + + /** + * Get the value of the <code>key</code> property as an <code>long</code>. + * If property is not set, the provided <code>defaultValue</code> is + * returned + * + * @param key + * property key. + * @param defaultValue + * default value. + * @throws NumberFormatException + * when the value is invalid + * @return return property value as an <code>long</code>, or + * <code>defaultValue</code> + */ + public long getLong(String key, long defaultValue) { + String value = getProperty(key); + if (value != null) { + return Long.parseLong(value.trim()); + } + return defaultValue; + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java b/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java new file mode 100644 index 0000000..4f5548d --- /dev/null +++ b/src/java/test/org/apache/zookeeper/ClientRequestTimeoutTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.Assert; +import org.junit.Test; + +public class ClientRequestTimeoutTest extends QuorumPeerTestBase { + private static final int SERVER_COUNT = 3; + private boolean dropPacket = false; + private int dropPacketType = ZooDefs.OpCode.create; + + @Test(timeout = 120000) + public void testClientRequestTimeout() throws Exception { + int requestTimeOut = 15000; + System.setProperty("zookeeper.request.timeout", + Integer.toString(requestTimeOut)); + final int clientPorts[] = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + String server; + + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + + clientPorts[i]; + sb.append(server + "\n"); + } + String currentQuorumCfgSection = sb.toString(); + MainThread mt[] = new MainThread[SERVER_COUNT]; + + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, + false); + mt[i].start(); + } + + // ensure server started + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], + CONNECTION_TIMEOUT)); + } + + CountdownWatcher watch1 = new CountdownWatcher(); + CustomZooKeeper zk = new CustomZooKeeper(getCxnString(clientPorts), + ClientBase.CONNECTION_TIMEOUT, watch1); + watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + String data = "originalData"; + // lets see one successful operation + zk.create("/clientHang1", data.getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + + // now make environment for client hang + dropPacket = true; + dropPacketType = ZooDefs.OpCode.create; + + // Test synchronous API + try { + zk.create("/clientHang2", data.getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + fail("KeeperException is expected."); + } catch (KeeperException exception) { + assertEquals(KeeperException.Code.REQUESTTIMEOUT.intValue(), + exception.code().intValue()); + } + // reset the error behavior + dropPacket = false; + watch1.waitForConnected(ClientBase.CONNECTION_TIMEOUT); + + String path = "/clientHang3"; + String create = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + assertEquals(path, create); + + // do cleanup + zk.close(); + for (int i = 0; i < SERVER_COUNT; i++) { + mt[i].shutdown(); + } + } + + /** + * @return connection string in the form of + * 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3 + */ + private String getCxnString(int[] clientPorts) { + StringBuffer hostPortBuffer = new StringBuffer(); + for (int i = 0; i < clientPorts.length; i++) { + hostPortBuffer.append("127.0.0.1:"); + hostPortBuffer.append(clientPorts[i]); + if (i != (clientPorts.length - 1)) { + hostPortBuffer.append(','); + } + } + return hostPortBuffer.toString(); + } + + class CustomClientCnxn extends ClientCnxn { + + public CustomClientCnxn(String chrootPath, HostProvider hostProvider, + int sessionTimeout, ZooKeeper zooKeeper, + ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, + boolean canBeReadOnly) throws IOException { + super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, + clientCnxnSocket, canBeReadOnly); + } + + @Override + public void finishPacket(Packet p) { + if (dropPacket && p.requestHeader.getType() == dropPacketType) { + // do nothing, just return, it is the same as packet is dropped + // by the network + return; + } + super.finishPacket(p); + } + } + + class CustomZooKeeper extends ZooKeeper { + public CustomZooKeeper(String connectString, int sessionTimeout, + Watcher watcher) throws IOException { + super(connectString, sessionTimeout, watcher); + } + + @Override + protected ClientCnxn createConnection(String chrootPath, + HostProvider hostProvider, int sessionTimeout, + ZooKeeper zooKeeper, ClientWatchManager watcher, + ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) + throws IOException { + return new CustomClientCnxn(chrootPath, hostProvider, + sessionTimeout, zooKeeper, watcher, clientCnxnSocket, + canBeReadOnly); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/9f827984/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml ---------------------------------------------------------------------- diff --git a/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml b/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml index a2a978f..cca60b3 100644 --- a/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml +++ b/zookeeper-docs/src/documentation/content/xdocs/zookeeperProgrammers.xml @@ -1553,6 +1553,26 @@ public abstract class ServerAuthenticationProvider implements AuthenticationProv <para>Specifies path to kinit binary. Default is "/usr/bin/kinit".</para> </listitem> </varlistentry> + <varlistentry> + <term>zookeeper.request.timeout</term> + <listitem> + <para> + <emphasis role="bold">New in 3.6.0,3.5.5:</emphasis> + If ZooKeeper server is not responding or if there is a delay in the + network, ZooKeeper client java sync API waits infinitely for the + response. To avoid this situation configure + zookeeper.request.timeout. By default this feature is disabled and + default value is 0. To enable this feature configure a positive + integer value. for example to set value to 30 second configure + zookeeper.request.timeout=30000. + </para> + <para> + If response is not received within configured zookeeper.request.timeout + then outgoing and pending requests are cancelled with + org.apache.zookeeper.KeeperException.ConnectionLossException. + </para> + </listitem> + </varlistentry> </variablelist> </section> </section>