This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f53594d8aad42d1de44fa8a33acc043cbd4c1d57
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Fri Mar 1 16:26:48 2019 +0100

    [FLINK-11336][zk] Delete ZNodes when 
ZooKeeperHaServices#closeAndCleanupAllData
    
    When calling ZooKeeperHaServices#closeAndCleanupAllData we should delete the
    HA_CLUSTER_ID znode which is owned by the respective ZooKeeperHaServices.
    Moreover, the method tries to go up the chain of parent znodes and tries to
    delete all empty parent nodes. This should clean up otherwisely orphaned
    parent znodes.
---
 .../zookeeper/ZooKeeperHaServices.java             |  62 +++++
 .../zookeeper/ZooKeeperHaServicesTest.java         | 250 +++++++++++++++++++++
 .../ZooKeeperLeaderRetrievalTest.java              |  12 +-
 3 files changed, 318 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 8b33c2c..1b2ff44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -34,9 +34,13 @@ import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
@@ -224,6 +228,12 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
                        exception = t;
                }
 
+               try {
+                       cleanupZooKeeperPaths();
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
                internalClose();
 
                if (exception != null) {
@@ -232,6 +242,58 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
        }
 
        /**
+        * Cleans up leftover ZooKeeper paths.
+        */
+       private void cleanupZooKeeperPaths() throws Exception {
+               deleteOwnedZNode();
+               tryDeleteEmptyParentZNodes();
+       }
+
+       private void deleteOwnedZNode() throws Exception {
+               // delete the HA_CLUSTER_ID znode which is owned by this cluster
+               client.delete().deletingChildrenIfNeeded().forPath("/");
+       }
+
+       /**
+        * Tries to delete empty parent znodes.
+        *
+        * <p>IMPORTANT: This method can be removed once all supported 
ZooKeeper versions
+        * support the container {@link org.apache.zookeeper.CreateMode}.
+        *
+        * @throws Exception if the deletion fails for other reason than {@link 
KeeperException.NotEmptyException}
+        */
+       private void tryDeleteEmptyParentZNodes() throws Exception {
+               // try to delete the parent znodes if they are empty
+               String remainingPath = 
getParentPath(getNormalizedPath(client.getNamespace()));
+               final CuratorFramework nonNamespaceClient = 
client.usingNamespace(null);
+
+               while (!isRootPath(remainingPath)) {
+                       try {
+                               
nonNamespaceClient.delete().forPath(remainingPath);
+                       } catch (KeeperException.NotEmptyException ignored) {
+                               // We can only delete empty znodes
+                               break;
+                       }
+
+                       remainingPath = getParentPath(remainingPath);
+               }
+       }
+
+       private static boolean isRootPath(String remainingPath) {
+               return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
+       }
+
+       @Nonnull
+       private static String getNormalizedPath(String path) {
+               return ZKPaths.makePath(path, "");
+       }
+
+       @Nonnull
+       private static String getParentPath(String path) {
+               return ZKPaths.getPathAndNode(path).getPath();
+       }
+
+       /**
         * Closes components which don't distinguish between close and 
closeAndCleanupAllData.
         */
        private void internalClose() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
new file mode 100644
index 0000000..0edfdb9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link ZooKeeperHaServices}.
+ */
+public class ZooKeeperHaServicesTest extends TestLogger {
+
+       @ClassRule
+       public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+       private static CuratorFramework client;
+
+       @BeforeClass
+       public static void setupClass() {
+               client = startCuratorFramework();
+               client.start();
+       }
+
+       @Before
+       public void setup() throws Exception {
+               final List<String> children = client.getChildren().forPath("/");
+
+               for (String child : children) {
+                       if (!child.equals("zookeeper")) {
+                               
client.delete().deletingChildrenIfNeeded().forPath('/' + child);
+                       }
+               }
+       }
+
+       @AfterClass
+       public static void teardownClass() {
+               if (client != null) {
+                       client.close();
+               }
+       }
+
+       /**
+        * Tests that a simple {@link ZooKeeperHaServices#close()} does not 
delete ZooKeeper paths.
+        */
+       @Test
+       public void testSimpleClose() throws Exception {
+               final String rootPath = "/foo/bar/flink";
+               final Configuration configuration = 
createConfiguration(rootPath);
+
+               final TestingBlobStoreService blobStoreService = new 
TestingBlobStoreService();
+
+               runCleanupTest(
+                       configuration,
+                       blobStoreService,
+                       ZooKeeperHaServices::close);
+
+               assertThat(blobStoreService.isClosed(), is(true));
+               assertThat(blobStoreService.isClosedAndCleanedUpAllData(), 
is(false));
+
+               final List<String> children = 
client.getChildren().forPath(rootPath);
+               assertThat(children, is(not(empty())));
+       }
+
+       /**
+        * Tests that the {@link ZooKeeperHaServices} cleans up all paths if
+        * it is closed via {@link 
ZooKeeperHaServices#closeAndCleanupAllData()}.
+        */
+       @Test
+       public void testSimpleCloseAndCleanupAllData() throws Exception {
+               final Configuration configuration = 
createConfiguration("/foo/bar/flink");
+
+               final TestingBlobStoreService blobStoreService = new 
TestingBlobStoreService();
+
+               final List<String> initialChildren = 
client.getChildren().forPath("/");
+
+               runCleanupTest(
+                       configuration,
+                       blobStoreService,
+                       ZooKeeperHaServices::closeAndCleanupAllData);
+
+               assertThat(blobStoreService.isClosedAndCleanedUpAllData(), 
is(true));
+
+               final List<String> children = client.getChildren().forPath("/");
+               assertThat(children, is(equalTo(initialChildren)));
+       }
+
+       /**
+        * Tests that we can only delete the parent znodes as long as they are 
empty.
+        */
+       @Test
+       public void testCloseAndCleanupAllDataWithUncle() throws Exception {
+               final String prefix = "/foo/bar";
+               final String flinkPath = prefix + "/flink";
+               final Configuration configuration = 
createConfiguration(flinkPath);
+
+               final TestingBlobStoreService blobStoreService = new 
TestingBlobStoreService();
+
+               final String unclePath = prefix + "/foobar";
+               
client.create().creatingParentContainersIfNeeded().forPath(unclePath);
+
+               runCleanupTest(
+                       configuration,
+                       blobStoreService,
+                       ZooKeeperHaServices::closeAndCleanupAllData);
+
+               assertThat(blobStoreService.isClosedAndCleanedUpAllData(), 
is(true));
+
+               assertThat(client.checkExists().forPath(flinkPath), 
is(nullValue()));
+               assertThat(client.checkExists().forPath(unclePath), 
is(notNullValue()));
+       }
+
+       private static CuratorFramework startCuratorFramework() {
+               return CuratorFrameworkFactory.builder()
+                               
.connectString(ZOO_KEEPER_RESOURCE.getConnectString())
+                               .retryPolicy(new RetryNTimes(50, 100))
+                               .build();
+       }
+
+       @Nonnull
+       private Configuration createConfiguration(String rootPath) {
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOO_KEEPER_RESOURCE.getConnectString());
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, rootPath);
+               return configuration;
+       }
+
+       private void runCleanupTest(
+                       Configuration configuration,
+                       TestingBlobStoreService blobStoreService,
+                       ThrowingConsumer<ZooKeeperHaServices, Exception> 
zooKeeperHaServicesConsumer) throws Exception {
+               try (ZooKeeperHaServices zooKeeperHaServices = new 
ZooKeeperHaServices(
+                       ZooKeeperUtils.startCuratorFramework(configuration),
+                       Executors.directExecutor(),
+                       configuration,
+                       blobStoreService)) {
+
+                       // create some Zk services to trigger the generation of 
paths
+                       final LeaderRetrievalService 
resourceManagerLeaderRetriever = 
zooKeeperHaServices.getResourceManagerLeaderRetriever();
+                       final LeaderElectionService 
resourceManagerLeaderElectionService = 
zooKeeperHaServices.getResourceManagerLeaderElectionService();
+                       final RunningJobsRegistry runningJobsRegistry = 
zooKeeperHaServices.getRunningJobsRegistry();
+
+                       resourceManagerLeaderRetriever.start(new 
TestingListener());
+                       resourceManagerLeaderElectionService.start(new 
TestingContender("foobar", resourceManagerLeaderElectionService));
+                       final JobID jobId = new JobID();
+                       runningJobsRegistry.setJobRunning(jobId);
+
+                       resourceManagerLeaderRetriever.stop();
+                       resourceManagerLeaderElectionService.stop();
+                       runningJobsRegistry.clearJob(jobId);
+
+                       zooKeeperHaServicesConsumer.accept(zooKeeperHaServices);
+               }
+       }
+
+       private static class TestingBlobStoreService implements 
BlobStoreService {
+
+               private boolean closedAndCleanedUpAllData = false;
+               private boolean closed = false;
+
+               @Override
+               public void closeAndCleanupAllData() {
+                       closedAndCleanedUpAllData = true;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       closed = true;
+               }
+
+               @Override
+               public boolean put(File localFile, JobID jobId, BlobKey 
blobKey) {
+                       return false;
+               }
+
+               @Override
+               public boolean delete(JobID jobId, BlobKey blobKey) {
+                       return false;
+               }
+
+               @Override
+               public boolean deleteAll(JobID jobId) {
+                       return false;
+               }
+
+               @Override
+               public boolean get(JobID jobId, BlobKey blobKey, File 
localFile) {
+                       return false;
+               }
+
+               private boolean isClosed() {
+                       return closed;
+               }
+
+               private boolean isClosedAndCleanedUpAllData() {
+                       return closedAndCleanedUpAllData;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 4a1cf80..d4e5dd9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -81,17 +81,17 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
 
        @After
        public void after() throws Exception {
-               if (testingServer != null) {
-                       testingServer.stop();
-
-                       testingServer = null;
-               }
-
                if (highAvailabilityServices != null) {
                        highAvailabilityServices.closeAndCleanupAllData();
 
                        highAvailabilityServices = null;
                }
+
+               if (testingServer != null) {
+                       testingServer.stop();
+
+                       testingServer = null;
+               }
        }
 
        /**

Reply via email to