This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 892fc2710 Add atomic recursive delete to ZK client and use for drop
instance (#2994)
892fc2710 is described below
commit 892fc2710c994261ccc46236c3ff9d66c2af32ea
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Mon Jan 27 17:25:12 2025 -0800
Add atomic recursive delete to ZK client and use for drop instance (#2994)
Add atomic recursive delete to ZK client and use for drop instance
---
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 19 ++---
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 27 ++++++-
.../zookeeper/api/client/RealmAwareZkClient.java | 3 +
.../zookeeper/impl/client/DedicatedZkClient.java | 14 ++++
.../zookeeper/impl/client/FederatedZkClient.java | 15 ++++
.../zookeeper/impl/client/SharedZkClient.java | 14 ++++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 83 ++++++++++++++++++++++
.../zookeeper/impl/client/TestRawZkClient.java | 56 +++++++++++++++
8 files changed, 217 insertions(+), 14 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d5998a166..ae914682b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import com.google.common.collect.ImmutableMap;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -274,18 +273,16 @@ public class ZKHelixAdmin implements HelixAdmin {
"Node " + instanceName + " is still alive for cluster " +
clusterName + ", can't drop.");
}
- // delete config path
- String instanceConfigsPath =
PropertyPathBuilder.instanceConfig(clusterName);
- ZKUtil.dropChildren(_zkClient, instanceConfigsPath,
instanceConfig.getRecord());
- // delete instance path
- dropInstancePathRecursively(instancePath,
instanceConfig.getInstanceName());
+ dropInstancePathsRecursively(clusterName, instanceName);
}
- private void dropInstancePathRecursively(String instancePath, String
instanceName) {
+ private void dropInstancePathsRecursively(String clusterName, String
instanceName) {
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(clusterName, instanceName);
+ String instancePath = PropertyPathBuilder.instance(clusterName,
instanceName);
int retryCnt = 0;
while (true) {
try {
- _zkClient.deleteRecursively(instancePath);
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath,
instanceConfigPath));
return;
} catch (ZkClientException e) {
if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
@@ -333,11 +330,7 @@ public class ZKHelixAdmin implements HelixAdmin {
private void purgeInstance(String clusterName, String instanceName) {
logger.info("Purge instance {} from cluster {}.", instanceName,
clusterName);
-
- String instanceConfigPath =
PropertyPathBuilder.instanceConfig(clusterName, instanceName);
- _zkClient.delete(instanceConfigPath);
- String instancePath = PropertyPathBuilder.instance(clusterName,
instanceName);
- dropInstancePathRecursively(instancePath, instanceName);
+ dropInstancePathsRecursively(clusterName, instanceName);
}
@Override
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index b54be8c04..b60b909a2 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -237,7 +237,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
new ZkException("ZkException: failed to delete " + instancePath,
new KeeperException.NotEmptyException(
"NotEmptyException: directory" + instancePath + " is not
empty"))))
- .when(mockZkClient).deleteRecursively(instancePath);
+
.when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath,
instanceConfigPath));
HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
try {
@@ -1342,4 +1342,29 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
_gSetupTool.deleteCluster(clusterName);
}
}
+
+ @Test
+ public void testDropInstance() {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+ int numInstances = 5;
+ final String clusterName = getShortClassName();
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster
should be setup");
+
+ // Add instances to cluster
+ for (int i = 0; i < numInstances; i++) {
+ admin.addInstance(clusterName, new InstanceConfig("localhost_" + i));
+ // Create dummy message nodes
+
_gZkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName,
"localhost_" + i, ""+i));
+ }
+ Assert.assertTrue(admin.getInstancesInCluster(clusterName).size() ==
numInstances, "Instances should be added");
+
+ for (int i = 0; i < 5; i++) {
+ admin.dropInstance(clusterName, new InstanceConfig("localhost_" + i));
+ }
+ Assert.assertTrue(admin.getInstancesInCluster(clusterName).isEmpty(),
"Instances should be removed");
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index e24cc6e9c..e18e86877 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -252,6 +252,9 @@ public interface RealmAwareZkClient {
void deleteRecursively(String path);
+ void deleteRecursivelyAtomic(String path);
+ void deleteRecursivelyAtomic(List<String> paths);
+
boolean delete(final String path);
boolean delete(final String path, final int expectedVersion);
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
index a1a06f48d..e21480629 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -383,6 +383,20 @@ public class DedicatedZkClient implements
RealmAwareZkClient {
_rawZkClient.deleteRecursively(path);
}
+ @Override
+ public void deleteRecursivelyAtomic(String path) {
+ checkIfPathContainsShardingKey(path);
+ _rawZkClient.deleteRecursivelyAtomic(path);
+ }
+
+ @Override
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ for (String path : paths) {
+ checkIfPathContainsShardingKey(path);
+ }
+ _rawZkClient.deleteRecursivelyAtomic(paths);
+ }
+
@Override
public boolean delete(String path) {
return delete(path, -1);
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 219ad9703..7943bc27a 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -44,6 +44,7 @@ import
org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
@@ -370,6 +371,20 @@ public class FederatedZkClient implements
RealmAwareZkClient {
getZkClient(path).deleteRecursively(path);
}
+ @Override
+ public void deleteRecursivelyAtomic(String path) {
+ getZkClient(path).deleteRecursivelyAtomic(path);
+ }
+
+ @Override
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ // Check if all paths are in the same realm. If not, throw error as we
cannot guarantee atomicity across clients.
+ if (paths.stream().map(this::getZkRealm).distinct().count() > 1) {
+ throw new IllegalArgumentException("Cannot atomically delete paths
across different realms");
+ }
+ getZkClient(paths.get(0)).deleteRecursivelyAtomic(paths);
+ }
+
@Override
public boolean delete(String path) {
return delete(path, -1);
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 367628868..210e82bd6 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -412,6 +412,20 @@ public class SharedZkClient implements RealmAwareZkClient {
_innerSharedZkClient.deleteRecursively(path);
}
+ @Override
+ public void deleteRecursivelyAtomic(String path) {
+ checkIfPathContainsShardingKey(path);
+ _innerSharedZkClient.deleteRecursivelyAtomic(path);
+ }
+
+ @Override
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ for (String path : paths) {
+ checkIfPathContainsShardingKey(path);
+ }
+ _innerSharedZkClient.deleteRecursivelyAtomic(paths);
+ }
+
@Override
public boolean delete(String path) {
return delete(path, -1);
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 9d21e7604..33b8bc185 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -20,12 +20,17 @@ package org.apache.helix.zookeeper.zkclient;
*/
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalLong;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -1820,6 +1825,84 @@ public class ZkClient implements Watcher {
}
}
+ /**
+ * Delete the path as well as all its children. This operation is atomic and
will either delete all nodes or none.
+ * This operation may fail if another agent is concurrently creating or
deleting nodes under the path.
+ * @param path ZK path to delete
+ */
+ public void deleteRecursivelyAtomic(String path) {
+ deleteRecursivelyAtomic(Arrays.asList(path));
+ }
+
+ /**
+ * Delete the paths as well as all their children. This operation is atomic
and will either delete all nodes or none.
+ * This operation may fail if another agent is concurrently creating or
deleting nodes under any of the paths.
+ * @param paths ZK paths to delete
+ */
+ public void deleteRecursivelyAtomic(List<String> paths) {
+ List<Op> ops = new ArrayList<>();
+ List<OpResult> opResults;
+ for (String path : paths) {
+ ops.addAll(getOpsForRecursiveDelete(path));
+ }
+
+ // Return early if no operations to execute
+ if (ops.isEmpty()) {
+ return;
+ }
+
+ try {
+ opResults = multi(ops);
+ } catch (Exception e) {
+ LOG.error("zkclient {}, Failed to delete paths {}, exception {}", _uid,
paths, e);
+ throw new ZkClientException("Failed to delete paths " + paths, e);
+ }
+
+ // Check if any of the operations failed. Create mapping of failed paths
to error codes
+ Map<String, KeeperException.Code> failedPathsMap = new HashMap<>();
+ for (int i = 0; i < opResults.size(); i++) {
+ if (opResults.get(i) instanceof OpResult.ErrorResult) {
+ failedPathsMap.put(ops.get(i).getPath(),
+ KeeperException.Code.get(((OpResult.ErrorResult)
opResults.get(i)).getErr()));
+ }
+ }
+
+ // Log and throw exception if any of the operations failed
+ if (!failedPathsMap.isEmpty()) {
+ LOG.error("zkclient {}, Failed to delete paths {}, multi returned with
error codes {} for sub-paths {}",
+ _uid, paths, failedPathsMap.keySet(), failedPathsMap.values());
+ throw new ZkClientException("Failed to delete paths " + paths + " with
ZK KeeperException error codes: "
+ + failedPathsMap.keySet() + " for paths: " +
failedPathsMap.values());
+ }
+ }
+
+ /**
+ * Get the list of operations to delete the given root and all its children.
Ops will be ordered so that deletion of
+ * children will come before parent nodes.
+ * @param root the root node to delete
+ * @return the list of ZK operations to delete the given root and all its
children
+ */
+ private List<Op> getOpsForRecursiveDelete(String root) {
+ List<Op> ops = new ArrayList<>();
+ // Return empty list if the root does not exist
+ if (!exists(root)) {
+ return ops;
+ }
+
+ // Level order traversal of tree, adding deleting operation for each node
+ // This will produce list of operations ordered from parent to children
nodes
+ Queue<String> nodes = new LinkedList<>();
+ nodes.offer(root);
+ while (!nodes.isEmpty()) {
+ String node = nodes.poll();
+ getChildren(node, false).stream().forEach(child -> nodes.offer(node +
"/" + child));
+ ops.add(Op.delete(node, -1));
+ }
+ // Reverse the list so that operations are ordered from children to parent
nodes
+ Collections.reverse(ops);
+ return ops;
+ }
+
private void processDataOrChildChange(WatchedEvent event, long
notificationTime) {
final String path = event.getPath();
final boolean pathExists = event.getType() != EventType.NodeDeleted;
diff --git
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index e5b589ff8..0ce0349ab 100644
---
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -21,6 +21,7 @@ package org.apache.helix.zookeeper.impl.client;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -51,6 +52,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import
org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
@@ -1231,4 +1233,58 @@ public class TestRawZkClient extends ZkTestBase {
}
}
}
+
+ @Test
+ void testDeleteRecursivelyAtomic() {
+ System.out.println("Start test: " + TestHelper.getTestMethodName());
+ String grandParent = "/testDeleteRecursively";
+ String parent = grandParent + "/parent";
+ String child1 = parent + "/child1";
+ String child2 = parent + "/child2";
+ _zkClient.createPersistent(grandParent);
+ _zkClient.createPersistent(parent);
+ _zkClient.createPersistent(child1);
+ _zkClient.createPersistent(child2);
+ Assert.assertTrue(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+ // Test calling delete on same path twice
+ try {
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent,
grandParent));
+ Assert.fail("Operation should not succeed when attempting to delete same
path twice");
+ } catch (ZkClientException expected) {
+ // Caught expected exception
+ }
+
+ Assert.assertTrue(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+ // Test calling delete on path that is child of another path in the list
+ try {
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, parent));
+ Assert.fail("Operation should not succeed when attempting to delete same
path twice");
+ } catch (ZkClientException expected) {
+ // Caught expected exception
+ }
+
+ // Test calling delete on single node
+ Assert.assertTrue(_zkClient.exists(child2));
+ _zkClient.deleteRecursivelyAtomic(child2);
+ Assert.assertFalse(_zkClient.exists(child2));
+
+ Assert.assertTrue(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());
+
+ // Test successfully delete multiple paths. Also that operation succeeds
when attempting to delete non-existent path
+ String newNode = "/newNode";
+ _zkClient.createPersistent(newNode);
+ Assert.assertTrue(_zkClient.exists(newNode));
+
+ String nonexistentPath = grandParent + "/nonexistent";
+ Assert.assertFalse(_zkClient.exists(nonexistentPath));
+
+ _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, newNode,
nonexistentPath));
+ Assert.assertFalse(_zkClient.exists(grandParent));
+ Assert.assertFalse(_zkClient.exists(newNode));
+ }
}