This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch helix-0.9.x-hotfix in repository https://gitbox.apache.org/repos/asf/helix.git
commit 1401ceb1f12acd8793ef5fbd8002070e842ee5d9 Author: Huizhi Lu <[email protected]> AuthorDate: Thu Jul 23 11:03:54 2020 -0700 ZkClient should not keep retrying getChildren() due to large number of children (#1109) For ZkClient's getChildren() operation, if there are a large number of children and the response packet size exceeds jute.maxbuffer default value 4MB on zk client side, ZkClient will get a ConnectionLossException and keep retrying connecting to ZK. The consequence is, the infinite retry may cause heavy GC on ZK server and kill ZK server. This commit implements a workaround to exit retry loop for getChildren() if a large number of children cause connection loss. --- .../helix/manager/zk/zookeeper/ZkClient.java | 72 +++++++++++++++- .../apache/helix/manager/zk/TestRawZkClient.java | 95 ++++++++++++++++++---- 2 files changed, 148 insertions(+), 19 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java index 1b4d1ca..e77761f 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java @@ -71,8 +71,14 @@ import org.slf4j.LoggerFactory; * WARN: Do not use this class directly, use {@link org.apache.helix.manager.zk.ZkClient} instead. */ public class ZkClient implements Watcher { - private static Logger LOG = LoggerFactory.getLogger(ZkClient.class); - private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds + private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class); + + private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds + + // If number of children exceeds this limit, getChildren() should not retry on connection loss. + // This is a workaround for exiting retry on connection loss because of large number of children. + // TODO: remove it once we have a better way to exit retry for this case + private static final int NUM_CHILDREN_LIMIT; private final IZkConnection _connection; private final long _operationRetryTimeoutInMillis; @@ -90,6 +96,13 @@ public class ZkClient implements Watcher { private PathBasedZkSerializer _pathBasedZkSerializer; private ZkClientMonitor _monitor; + static { + // 100K is specific for helix messages which use UUID, making packet length just below 4 MB. + // Set it here for unit test to use reflection to change value + // because compilers optimize constants by replacing them inline. + NUM_CHILDREN_LIMIT = 100 * 1000; + } + private class IZkDataListenerEntry { final IZkDataListener _dataListener; final boolean _prefetchData; @@ -713,11 +726,33 @@ public class ZkClient implements Watcher { protected List<String> getChildren(final String path, final boolean watch) { long startT = System.currentTimeMillis(); + try { List<String> children = retryUntilConnected(new Callable<List<String>>() { + private int connectionLossRetryCount = 0; + @Override public List<String> call() throws Exception { - return getConnection().getChildren(path, watch); + try { + return getConnection().getChildren(path, watch); + } catch (ConnectionLossException e) { + // Issue: https://github.com/apache/helix/issues/962 + // Connection loss might be caused by an excessive number of children. + // Infinitely retrying connecting may cause high GC in ZK server and kill ZK server. + // This is a workaround to check numChildren to have a chance to exit retry loop. + // Check numChildren stat every other 3 connection loss, because there is a higher + // possibility that connection loss is caused by other factors such as network + // connectivity, session expired, etc. + // TODO: remove this check once we have a better way to exit infinite retry + ++connectionLossRetryCount; + if (connectionLossRetryCount >= 3) { + checkNumChildrenLimit(path); + connectionLossRetryCount = 0; + } + + // Re-throw the ConnectionLossException for retryUntilConnected() to catch and retry. + throw e; + } } }); record(path, null, startT, ZkClientMonitor.AccessType.READ); @@ -1138,6 +1173,12 @@ public class ZkClient implements Watcher { } try { while (true) { + // Because ConnectionLossException and SessionExpiredException are caught but not thrown, + // we don't know what causes retry. This is used to record which one of the two exceptions + // causes retry in ZkTimeoutException. + // This also helps the test testConnectionLossWhileCreateEphemeral. + KeeperException.Code retryCauseCode; + if (isClosed()) { throw new IllegalStateException("ZkClient already closed!"); } @@ -1150,10 +1191,12 @@ public class ZkClient implements Watcher { } return callable.call(); } catch (ConnectionLossException e) { + retryCauseCode = e.code(); // we give the event thread some time to update the status to 'Disconnected' Thread.yield(); waitForRetry(); } catch (SessionExpiredException e) { + retryCauseCode = e.code(); // we give the event thread some time to update the status to 'Expired' Thread.yield(); waitForRetry(); @@ -1164,6 +1207,8 @@ public class ZkClient implements Watcher { } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } + + LOG.debug("Retrying operation, caused by {}", retryCauseCode); // before attempting a retry, check whether retry timeout has elapsed if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) { throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" @@ -1765,4 +1810,25 @@ public class ZkClient implements Watcher { } } } + + private void checkNumChildrenLimit(String path) throws KeeperException { + Stat stat = getStat(path); + if (stat == null) { + return; + } + + if (stat.getNumChildren() > NUM_CHILDREN_LIMIT) { + LOG.error("Failed to get children for path {} because of connection loss. " + + "Number of children {} exceeds limit {}, aborting retry.", path, + stat.getNumChildren(), + NUM_CHILDREN_LIMIT); + // MarshallingErrorException could represent transport error: exceeding the + // Jute buffer size. So use it to exit retry loop and tell that zk is not able to + // transport the data because packet length is too large. + throw new KeeperException.MarshallingErrorException(); + } else { + LOG.debug("Number of children {} is less than limit {}, not exiting retry.", + stat.getNumChildren(), NUM_CHILDREN_LIMIT); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java index ce109b7..9cea4c2 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java @@ -19,20 +19,10 @@ package org.apache.helix.manager.zk; * under the License. */ -import javax.management.MBeanServer; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkServer; +import org.I0Itec.zkclient.exception.ZkException; import org.apache.helix.SystemPropertyKeys; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; @@ -42,12 +32,8 @@ import org.apache.helix.monitoring.mbeans.MBeanRegistrar; import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.monitoring.mbeans.ZkClientMonitor; import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.*; import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.testng.Assert; import org.testng.AssertJUnit; @@ -55,6 +41,20 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + public class TestRawZkClient extends ZkUnitTestBase { private final String TEST_TAG = "test_monitor"; private final String TEST_ROOT = "/my_cluster/IDEALSTATES"; @@ -412,4 +412,67 @@ public class TestRawZkClient extends ZkUnitTestBase { zkClient.delete("/tmp/asyncOversize"); } } + + + /* + * Tests getChildren() when there are an excessive number of children and connection loss happens, + * the operation should terminate and exit retry loop. + */ + @Test + public void testGetChildrenOnLargeNumChildren() throws Exception { + // Default packetLen is 4M. It is static final and initialized + // when first zkClient is created. + // So we could not just set "jute.maxbuffer" to change the value. + // Reflection is needed to change the value. + // Remove "final" modifier + Field modifiersField = Field.class.getDeclaredField("modifiers"); + boolean isModifierAccessible = modifiersField.isAccessible(); + modifiersField.setAccessible(true); + + Field packetLenField = ClientCnxn.class.getDeclaredField("packetLen"); + Field childrenLimitField = + org.apache.helix.manager.zk.zookeeper.ZkClient.class.getDeclaredField("NUM_CHILDREN_LIMIT"); + modifiersField.setInt(packetLenField, packetLenField.getModifiers() & ~Modifier.FINAL); + modifiersField.setInt(childrenLimitField, childrenLimitField.getModifiers() & ~Modifier.FINAL); + + boolean isPacketLenAccessible = packetLenField.isAccessible(); + packetLenField.setAccessible(true); + int originPacketLen = packetLenField.getInt(null); + // Keep 150 bytes for successfully creating each child node. + packetLenField.set(null, 150); + + boolean isChildrenLimitAccessible = childrenLimitField.isAccessible(); + childrenLimitField.setAccessible(true); + int originChildrenLimit = childrenLimitField.getInt(null); + childrenLimitField.set(null, 2); + + String path = "/" + TestHelper.getTestMethodName(); + // Create 5 children to make packet length of children exceed 150 bytes + // and cause connection loss for getChildren() operation + for (int i = 0; i < 5; i++) { + _zkClient.createPersistent(path + "/" + UUID.randomUUID().toString(), true); + } + + try { + _zkClient.getChildren(path); + Assert.fail("Should not successfully get children."); + } catch (ZkException expected) { + Assert.assertEquals(expected.getMessage(), + "org.apache.zookeeper.KeeperException$MarshallingErrorException: " + + "KeeperErrorCode = MarshallingError"); + } finally { + packetLenField.set(null, originPacketLen); + packetLenField.setAccessible(isPacketLenAccessible); + + childrenLimitField.set(null, originChildrenLimit); + childrenLimitField.setAccessible(isChildrenLimitAccessible); + + modifiersField.setAccessible(isModifierAccessible); + + Assert.assertTrue(TestHelper.verify(() -> { + _zkClient.deleteRecursively(path); + return !_zkClient.exists(path); + }, TestHelper.WAIT_DURATION)); + } + } }
