This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a7bd6de71de [FLINK-32438][runtime] Merges AbstractZooKeeperHaServices 
with ZooKeeperMultipleComponentLeaderElectionHaServices
a7bd6de71de is described below

commit a7bd6de71dea56307ca9240fee0c6000d11993e5
Author: Matthias Pohl <matthias.p...@aiven.io>
AuthorDate: Mon Jun 26 13:03:43 2023 +0200

    [FLINK-32438][runtime] Merges AbstractZooKeeperHaServices with 
ZooKeeperMultipleComponentLeaderElectionHaServices
    
    Signed-off-by: Matthias Pohl <matthias.p...@aiven.io>
---
 .../zookeeper/AbstractZooKeeperHaServices.java     | 148 ------------------
 ...rMultipleComponentLeaderElectionHaServices.java | 173 ++++++++++++++++-----
 ...eeperMultipleComponentLeaderElectionDriver.java |   4 +-
 3 files changed, 139 insertions(+), 186 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/AbstractZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/AbstractZooKeeperHaServices.java
deleted file mode 100644
index 0b56fe256a7..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/AbstractZooKeeperHaServices.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.zookeeper;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobStoreService;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.AbstractHaServices;
-import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
-import org.apache.flink.runtime.jobmanager.JobGraphStore;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-
-import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths;
-import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** Abstract ZooKeeper based HA services. */
-public abstract class AbstractZooKeeperHaServices extends AbstractHaServices {
-    /** The curator resource to use. */
-    private final CuratorFrameworkWithUnhandledErrorListener 
curatorFrameworkWrapper;
-
-    public AbstractZooKeeperHaServices(
-            CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper,
-            Executor executor,
-            Configuration configuration,
-            BlobStoreService blobStoreService)
-            throws IOException {
-        super(
-                configuration,
-                executor,
-                blobStoreService,
-                FileSystemJobResultStore.fromConfiguration(configuration));
-        this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper);
-    }
-
-    protected final CuratorFramework getCuratorFramework() {
-        return curatorFrameworkWrapper.asCuratorFramework();
-    }
-
-    @Override
-    public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws 
Exception {
-        return new ZooKeeperCheckpointRecoveryFactory(
-                ZooKeeperUtils.useNamespaceAndEnsurePath(
-                        curatorFrameworkWrapper.asCuratorFramework(), 
ZooKeeperUtils.getJobsPath()),
-                configuration,
-                ioExecutor);
-    }
-
-    @Override
-    public JobGraphStore createJobGraphStore() throws Exception {
-        return ZooKeeperUtils.createJobGraphs(
-                curatorFrameworkWrapper.asCuratorFramework(), configuration);
-    }
-
-    @Override
-    protected void internalClose() throws Exception {
-        curatorFrameworkWrapper.close();
-    }
-
-    @Override
-    protected void internalCleanup() throws Exception {
-        cleanupZooKeeperPaths();
-    }
-
-    @Override
-    protected void internalCleanupJobData(JobID jobID) throws Exception {
-        deleteZNode(ZooKeeperUtils.getLeaderPathForJob(jobID));
-    }
-
-    /** Cleans up leftover ZooKeeper paths. */
-    private void cleanupZooKeeperPaths() throws Exception {
-        deleteOwnedZNode();
-        tryDeleteEmptyParentZNodes();
-    }
-
-    private void deleteOwnedZNode() throws Exception {
-        deleteZNode("/");
-    }
-
-    protected void deleteZNode(String path) throws Exception {
-        
ZooKeeperUtils.deleteZNode(curatorFrameworkWrapper.asCuratorFramework(), path);
-    }
-
-    /**
-     * Tries to delete empty parent znodes.
-     *
-     * <p>IMPORTANT: This method can be removed once all supported ZooKeeper 
versions support the
-     * container {@link org.apache.zookeeper.CreateMode}.
-     *
-     * @throws Exception if the deletion fails for other reason than {@link
-     *     KeeperException.NotEmptyException}
-     */
-    private void tryDeleteEmptyParentZNodes() throws Exception {
-        // try to delete the parent znodes if they are empty
-        String remainingPath =
-                getParentPath(
-                        getNormalizedPath(
-                                
curatorFrameworkWrapper.asCuratorFramework().getNamespace()));
-        final CuratorFramework nonNamespaceClient =
-                
curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null);
-
-        while (!isRootPath(remainingPath)) {
-            try {
-                nonNamespaceClient.delete().forPath(remainingPath);
-            } catch (KeeperException.NotEmptyException ignored) {
-                // We can only delete empty znodes
-                break;
-            }
-
-            remainingPath = getParentPath(remainingPath);
-        }
-    }
-
-    private static boolean isRootPath(String remainingPath) {
-        return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
-    }
-
-    private static String getNormalizedPath(String path) {
-        return ZKPaths.makePath(path, "");
-    }
-
-    private static String getParentPath(String path) {
-        return ZKPaths.getPathAndNode(path).getPath();
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
index bc95589f3bc..6298a13e4e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java
@@ -21,6 +21,11 @@ package org.apache.flink.runtime.highavailability.zookeeper;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.AbstractHaServices;
+import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import 
org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService;
 import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory;
 import 
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionService;
@@ -32,12 +37,17 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import 
org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * ZooKeeper HA services that only use a single leader election per process.
  *
@@ -59,13 +69,12 @@ import java.util.concurrent.Executor;
  *      |                 |       /checkpoint_id_counter
  * </pre>
  */
-public class ZooKeeperMultipleComponentLeaderElectionHaServices
-        extends AbstractZooKeeperHaServices {
+public class ZooKeeperMultipleComponentLeaderElectionHaServices extends 
AbstractHaServices {
+    /** The curator resource to use. */
+    private final CuratorFrameworkWithUnhandledErrorListener 
curatorFrameworkWrapper;
 
     private final Object lock = new Object();
 
-    private final CuratorFramework leaderNamespacedCuratorFramework;
-
     private final FatalErrorHandler fatalErrorHandler;
 
     @Nullable
@@ -74,18 +83,129 @@ public class 
ZooKeeperMultipleComponentLeaderElectionHaServices
 
     public ZooKeeperMultipleComponentLeaderElectionHaServices(
             CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper,
-            Configuration config,
-            Executor ioExecutor,
+            Configuration configuration,
+            Executor executor,
             BlobStoreService blobStoreService,
             FatalErrorHandler fatalErrorHandler)
-            throws Exception {
-        super(curatorFrameworkWrapper, ioExecutor, config, blobStoreService);
-        this.leaderNamespacedCuratorFramework =
-                ZooKeeperUtils.useNamespaceAndEnsurePath(
-                        getCuratorFramework(), ZooKeeperUtils.getLeaderPath());
+            throws IOException {
+        super(
+                configuration,
+                executor,
+                blobStoreService,
+                FileSystemJobResultStore.fromConfiguration(configuration));
+        this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper);
+
         this.fatalErrorHandler = fatalErrorHandler;
     }
 
+    @Override
+    public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws 
Exception {
+        return new ZooKeeperCheckpointRecoveryFactory(
+                ZooKeeperUtils.useNamespaceAndEnsurePath(
+                        curatorFrameworkWrapper.asCuratorFramework(), 
ZooKeeperUtils.getJobsPath()),
+                configuration,
+                ioExecutor);
+    }
+
+    @Override
+    public JobGraphStore createJobGraphStore() throws Exception {
+        return ZooKeeperUtils.createJobGraphs(
+                curatorFrameworkWrapper.asCuratorFramework(), configuration);
+    }
+
+    @Override
+    protected void internalClose() throws Exception {
+        Exception exception = null;
+        synchronized (lock) {
+            if (multipleComponentLeaderElectionService != null) {
+                try {
+                    multipleComponentLeaderElectionService.close();
+                } catch (Exception e) {
+                    exception = e;
+                }
+                multipleComponentLeaderElectionService = null;
+            }
+        }
+
+        try {
+            curatorFrameworkWrapper.close();
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        ExceptionUtils.tryRethrowException(exception);
+    }
+
+    @Override
+    protected void internalCleanup() throws Exception {
+        cleanupZooKeeperPaths();
+    }
+
+    @Override
+    protected void internalCleanupJobData(JobID jobID) throws Exception {
+        deleteZNode(ZooKeeperUtils.getLeaderPathForJob(jobID));
+    }
+
+    /** Cleans up leftover ZooKeeper paths. */
+    private void cleanupZooKeeperPaths() throws Exception {
+        deleteOwnedZNode();
+        tryDeleteEmptyParentZNodes();
+    }
+
+    private void deleteOwnedZNode() throws Exception {
+        deleteZNode("/");
+    }
+
+    protected void deleteZNode(String path) throws Exception {
+        
ZooKeeperUtils.deleteZNode(curatorFrameworkWrapper.asCuratorFramework(), path);
+    }
+
+    /**
+     * Tries to delete empty parent znodes.
+     *
+     * <p>IMPORTANT: This method can be removed once all supported ZooKeeper 
versions support the
+     * container {@link org.apache.zookeeper.CreateMode}.
+     *
+     * @throws Exception if the deletion fails for other reason than {@link
+     *     KeeperException.NotEmptyException}
+     */
+    private void tryDeleteEmptyParentZNodes() throws Exception {
+        // try to delete the parent znodes if they are empty
+        String remainingPath =
+                getParentPath(
+                        getNormalizedPath(
+                                
curatorFrameworkWrapper.asCuratorFramework().getNamespace()));
+        final CuratorFramework nonNamespaceClient =
+                
curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null);
+
+        while (!isRootPath(remainingPath)) {
+            try {
+                nonNamespaceClient.delete().forPath(remainingPath);
+            } catch (KeeperException.NotEmptyException ignored) {
+                // We can only delete empty znodes
+                break;
+            }
+
+            remainingPath = getParentPath(remainingPath);
+        }
+    }
+
+    private static boolean isRootPath(String remainingPath) {
+        return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
+    }
+
+    private static String getNormalizedPath(String path) {
+        return ZKPaths.makePath(path, "");
+    }
+
+    private static String getParentPath(String path) {
+        return ZKPaths.getPathAndNode(path).getPath();
+    }
+
+    // ///////////////////////////////////////////////
+    // LeaderElection/-Retrieval-related methods
+    // ///////////////////////////////////////////////
+
     @Override
     protected MultipleComponentLeaderElectionDriverFactory 
createLeaderElectionDriverFactory(
             String leaderName) {
@@ -100,7 +220,9 @@ public class 
ZooKeeperMultipleComponentLeaderElectionHaServices
                             new DefaultMultipleComponentLeaderElectionService(
                                     fatalErrorHandler,
                                     new 
ZooKeeperMultipleComponentLeaderElectionDriverFactory(
-                                            leaderNamespacedCuratorFramework));
+                                            
ZooKeeperUtils.useNamespaceAndEnsurePath(
+                                                    
curatorFrameworkWrapper.asCuratorFramework(),
+                                                    
ZooKeeperUtils.getLeaderPath())));
                 } catch (Exception e) {
                     throw new FlinkRuntimeException(
                             String.format(
@@ -119,30 +241,9 @@ public class 
ZooKeeperMultipleComponentLeaderElectionHaServices
     protected LeaderRetrievalService createLeaderRetrievalService(String 
leaderPath) {
         // Maybe use a single service for leader retrieval
         return ZooKeeperUtils.createLeaderRetrievalService(
-                leaderNamespacedCuratorFramework, leaderPath, configuration);
-    }
-
-    @Override
-    protected void internalClose() throws Exception {
-        Exception exception = null;
-        synchronized (lock) {
-            if (multipleComponentLeaderElectionService != null) {
-                try {
-                    multipleComponentLeaderElectionService.close();
-                } catch (Exception e) {
-                    exception = e;
-                }
-                multipleComponentLeaderElectionService = null;
-            }
-        }
-
-        try {
-            super.internalClose();
-        } catch (Exception e) {
-            exception = ExceptionUtils.firstOrSuppressed(e, exception);
-        }
-
-        ExceptionUtils.tryRethrowException(exception);
+                curatorFrameworkWrapper.asCuratorFramework(),
+                
ZooKeeperUtils.generateZookeeperPath(ZooKeeperUtils.getLeaderPath(), 
leaderPath),
+                configuration);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
index 56a61ea544c..3411fbb6b3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -264,8 +265,7 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver
 
     /**
      * This selector finds all connection info nodes. See {@link
-     * 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices}
-     * for more details on the Znode layout.
+     * ZooKeeperMultipleComponentLeaderElectionHaServices} for more details on 
the Znode layout.
      */
     private static class ConnectionInfoNodeSelector implements 
TreeCacheSelector {
         @Override

Reply via email to