This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a7bd6de71de [FLINK-32438][runtime] Merges AbstractZooKeeperHaServices with ZooKeeperMultipleComponentLeaderElectionHaServices a7bd6de71de is described below commit a7bd6de71dea56307ca9240fee0c6000d11993e5 Author: Matthias Pohl <matthias.p...@aiven.io> AuthorDate: Mon Jun 26 13:03:43 2023 +0200 [FLINK-32438][runtime] Merges AbstractZooKeeperHaServices with ZooKeeperMultipleComponentLeaderElectionHaServices Signed-off-by: Matthias Pohl <matthias.p...@aiven.io> --- .../zookeeper/AbstractZooKeeperHaServices.java | 148 ------------------ ...rMultipleComponentLeaderElectionHaServices.java | 173 ++++++++++++++++----- ...eeperMultipleComponentLeaderElectionDriver.java | 4 +- 3 files changed, 139 insertions(+), 186 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/AbstractZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/AbstractZooKeeperHaServices.java deleted file mode 100644 index 0b56fe256a7..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/AbstractZooKeeperHaServices.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.highavailability.zookeeper; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobStoreService; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; -import org.apache.flink.runtime.highavailability.AbstractHaServices; -import org.apache.flink.runtime.highavailability.FileSystemJobResultStore; -import org.apache.flink.runtime.jobmanager.JobGraphStore; -import org.apache.flink.runtime.util.ZooKeeperUtils; - -import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework; -import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths; -import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; - -import java.io.IOException; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** Abstract ZooKeeper based HA services. */ -public abstract class AbstractZooKeeperHaServices extends AbstractHaServices { - /** The curator resource to use. */ - private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; - - public AbstractZooKeeperHaServices( - CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper, - Executor executor, - Configuration configuration, - BlobStoreService blobStoreService) - throws IOException { - super( - configuration, - executor, - blobStoreService, - FileSystemJobResultStore.fromConfiguration(configuration)); - this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper); - } - - protected final CuratorFramework getCuratorFramework() { - return curatorFrameworkWrapper.asCuratorFramework(); - } - - @Override - public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception { - return new ZooKeeperCheckpointRecoveryFactory( - ZooKeeperUtils.useNamespaceAndEnsurePath( - curatorFrameworkWrapper.asCuratorFramework(), ZooKeeperUtils.getJobsPath()), - configuration, - ioExecutor); - } - - @Override - public JobGraphStore createJobGraphStore() throws Exception { - return ZooKeeperUtils.createJobGraphs( - curatorFrameworkWrapper.asCuratorFramework(), configuration); - } - - @Override - protected void internalClose() throws Exception { - curatorFrameworkWrapper.close(); - } - - @Override - protected void internalCleanup() throws Exception { - cleanupZooKeeperPaths(); - } - - @Override - protected void internalCleanupJobData(JobID jobID) throws Exception { - deleteZNode(ZooKeeperUtils.getLeaderPathForJob(jobID)); - } - - /** Cleans up leftover ZooKeeper paths. */ - private void cleanupZooKeeperPaths() throws Exception { - deleteOwnedZNode(); - tryDeleteEmptyParentZNodes(); - } - - private void deleteOwnedZNode() throws Exception { - deleteZNode("/"); - } - - protected void deleteZNode(String path) throws Exception { - ZooKeeperUtils.deleteZNode(curatorFrameworkWrapper.asCuratorFramework(), path); - } - - /** - * Tries to delete empty parent znodes. - * - * <p>IMPORTANT: This method can be removed once all supported ZooKeeper versions support the - * container {@link org.apache.zookeeper.CreateMode}. - * - * @throws Exception if the deletion fails for other reason than {@link - * KeeperException.NotEmptyException} - */ - private void tryDeleteEmptyParentZNodes() throws Exception { - // try to delete the parent znodes if they are empty - String remainingPath = - getParentPath( - getNormalizedPath( - curatorFrameworkWrapper.asCuratorFramework().getNamespace())); - final CuratorFramework nonNamespaceClient = - curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null); - - while (!isRootPath(remainingPath)) { - try { - nonNamespaceClient.delete().forPath(remainingPath); - } catch (KeeperException.NotEmptyException ignored) { - // We can only delete empty znodes - break; - } - - remainingPath = getParentPath(remainingPath); - } - } - - private static boolean isRootPath(String remainingPath) { - return ZKPaths.PATH_SEPARATOR.equals(remainingPath); - } - - private static String getNormalizedPath(String path) { - return ZKPaths.makePath(path, ""); - } - - private static String getParentPath(String path) { - return ZKPaths.getPathAndNode(path).getPath(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java index bc95589f3bc..6298a13e4e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperMultipleComponentLeaderElectionHaServices.java @@ -21,6 +21,11 @@ package org.apache.flink.runtime.highavailability.zookeeper; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory; +import org.apache.flink.runtime.highavailability.AbstractHaServices; +import org.apache.flink.runtime.highavailability.FileSystemJobResultStore; +import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.leaderelection.DefaultMultipleComponentLeaderElectionService; import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverFactory; import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionService; @@ -32,12 +37,17 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; import java.util.concurrent.Executor; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * ZooKeeper HA services that only use a single leader election per process. * @@ -59,13 +69,12 @@ import java.util.concurrent.Executor; * | | /checkpoint_id_counter * </pre> */ -public class ZooKeeperMultipleComponentLeaderElectionHaServices - extends AbstractZooKeeperHaServices { +public class ZooKeeperMultipleComponentLeaderElectionHaServices extends AbstractHaServices { + /** The curator resource to use. */ + private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private final Object lock = new Object(); - private final CuratorFramework leaderNamespacedCuratorFramework; - private final FatalErrorHandler fatalErrorHandler; @Nullable @@ -74,18 +83,129 @@ public class ZooKeeperMultipleComponentLeaderElectionHaServices public ZooKeeperMultipleComponentLeaderElectionHaServices( CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper, - Configuration config, - Executor ioExecutor, + Configuration configuration, + Executor executor, BlobStoreService blobStoreService, FatalErrorHandler fatalErrorHandler) - throws Exception { - super(curatorFrameworkWrapper, ioExecutor, config, blobStoreService); - this.leaderNamespacedCuratorFramework = - ZooKeeperUtils.useNamespaceAndEnsurePath( - getCuratorFramework(), ZooKeeperUtils.getLeaderPath()); + throws IOException { + super( + configuration, + executor, + blobStoreService, + FileSystemJobResultStore.fromConfiguration(configuration)); + this.curatorFrameworkWrapper = checkNotNull(curatorFrameworkWrapper); + this.fatalErrorHandler = fatalErrorHandler; } + @Override + public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception { + return new ZooKeeperCheckpointRecoveryFactory( + ZooKeeperUtils.useNamespaceAndEnsurePath( + curatorFrameworkWrapper.asCuratorFramework(), ZooKeeperUtils.getJobsPath()), + configuration, + ioExecutor); + } + + @Override + public JobGraphStore createJobGraphStore() throws Exception { + return ZooKeeperUtils.createJobGraphs( + curatorFrameworkWrapper.asCuratorFramework(), configuration); + } + + @Override + protected void internalClose() throws Exception { + Exception exception = null; + synchronized (lock) { + if (multipleComponentLeaderElectionService != null) { + try { + multipleComponentLeaderElectionService.close(); + } catch (Exception e) { + exception = e; + } + multipleComponentLeaderElectionService = null; + } + } + + try { + curatorFrameworkWrapper.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + ExceptionUtils.tryRethrowException(exception); + } + + @Override + protected void internalCleanup() throws Exception { + cleanupZooKeeperPaths(); + } + + @Override + protected void internalCleanupJobData(JobID jobID) throws Exception { + deleteZNode(ZooKeeperUtils.getLeaderPathForJob(jobID)); + } + + /** Cleans up leftover ZooKeeper paths. */ + private void cleanupZooKeeperPaths() throws Exception { + deleteOwnedZNode(); + tryDeleteEmptyParentZNodes(); + } + + private void deleteOwnedZNode() throws Exception { + deleteZNode("/"); + } + + protected void deleteZNode(String path) throws Exception { + ZooKeeperUtils.deleteZNode(curatorFrameworkWrapper.asCuratorFramework(), path); + } + + /** + * Tries to delete empty parent znodes. + * + * <p>IMPORTANT: This method can be removed once all supported ZooKeeper versions support the + * container {@link org.apache.zookeeper.CreateMode}. + * + * @throws Exception if the deletion fails for other reason than {@link + * KeeperException.NotEmptyException} + */ + private void tryDeleteEmptyParentZNodes() throws Exception { + // try to delete the parent znodes if they are empty + String remainingPath = + getParentPath( + getNormalizedPath( + curatorFrameworkWrapper.asCuratorFramework().getNamespace())); + final CuratorFramework nonNamespaceClient = + curatorFrameworkWrapper.asCuratorFramework().usingNamespace(null); + + while (!isRootPath(remainingPath)) { + try { + nonNamespaceClient.delete().forPath(remainingPath); + } catch (KeeperException.NotEmptyException ignored) { + // We can only delete empty znodes + break; + } + + remainingPath = getParentPath(remainingPath); + } + } + + private static boolean isRootPath(String remainingPath) { + return ZKPaths.PATH_SEPARATOR.equals(remainingPath); + } + + private static String getNormalizedPath(String path) { + return ZKPaths.makePath(path, ""); + } + + private static String getParentPath(String path) { + return ZKPaths.getPathAndNode(path).getPath(); + } + + // /////////////////////////////////////////////// + // LeaderElection/-Retrieval-related methods + // /////////////////////////////////////////////// + @Override protected MultipleComponentLeaderElectionDriverFactory createLeaderElectionDriverFactory( String leaderName) { @@ -100,7 +220,9 @@ public class ZooKeeperMultipleComponentLeaderElectionHaServices new DefaultMultipleComponentLeaderElectionService( fatalErrorHandler, new ZooKeeperMultipleComponentLeaderElectionDriverFactory( - leaderNamespacedCuratorFramework)); + ZooKeeperUtils.useNamespaceAndEnsurePath( + curatorFrameworkWrapper.asCuratorFramework(), + ZooKeeperUtils.getLeaderPath()))); } catch (Exception e) { throw new FlinkRuntimeException( String.format( @@ -119,30 +241,9 @@ public class ZooKeeperMultipleComponentLeaderElectionHaServices protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) { // Maybe use a single service for leader retrieval return ZooKeeperUtils.createLeaderRetrievalService( - leaderNamespacedCuratorFramework, leaderPath, configuration); - } - - @Override - protected void internalClose() throws Exception { - Exception exception = null; - synchronized (lock) { - if (multipleComponentLeaderElectionService != null) { - try { - multipleComponentLeaderElectionService.close(); - } catch (Exception e) { - exception = e; - } - multipleComponentLeaderElectionService = null; - } - } - - try { - super.internalClose(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - ExceptionUtils.tryRethrowException(exception); + curatorFrameworkWrapper.asCuratorFramework(), + ZooKeeperUtils.generateZookeeperPath(ZooKeeperUtils.getLeaderPath(), leaderPath), + configuration); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java index 56a61ea544c..3411fbb6b3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperMultipleComponentLeaderElectionDriver.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; @@ -264,8 +265,7 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver /** * This selector finds all connection info nodes. See {@link - * org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices} - * for more details on the Znode layout. + * ZooKeeperMultipleComponentLeaderElectionHaServices} for more details on the Znode layout. */ private static class ConnectionInfoNodeSelector implements TreeCacheSelector { @Override