This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b936acb7be50ccc9fed0b32fe4bc0a9185748604 Author: Matthias Pohl <matth...@ververica.com> AuthorDate: Wed Dec 15 12:40:09 2021 +0100 [FLINK-25432][runtime] Makes HighAvailabilityServices implement LocallyCleanableResource and GloballyCleanableResource --- .../apache/flink/runtime/dispatcher/Dispatcher.java | 2 +- .../highavailability/AbstractHaServices.java | 21 ++++++++++++++++----- .../highavailability/HighAvailabilityServices.java | 18 ++++++++++-------- .../dispatcher/DispatcherResourceCleanupTest.java | 2 +- .../flink/runtime/dispatcher/DispatcherTest.java | 4 ++-- .../highavailability/AbstractHaServicesTest.java | 8 ++++---- .../TestingHighAvailabilityServices.java | 17 +++++++++++------ .../zookeeper/ZooKeeperHaServicesTest.java | 2 +- 8 files changed, 46 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index fecba4b..92fe36b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -905,7 +905,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher jobManagerMetricGroup.removeJob(jobId); if (jobGraphRemoved) { try { - highAvailabilityServices.cleanupJobData(jobId); + highAvailabilityServices.globalCleanup(jobId); } catch (Exception e) { log.warn( "Could not properly clean data for job {} stored by ha services", jobId, e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index d6c0418..e8e83d5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -31,6 +31,8 @@ import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -206,10 +208,19 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { } @Override - public void cleanupJobData(JobID jobID) throws Exception { - logger.info("Clean up the high availability data for job {}.", jobID); - internalCleanupJobData(jobID); - logger.info("Finished cleaning up the high availability data for job {}.", jobID); + public CompletableFuture<Void> globalCleanupAsync(JobID jobID, Executor executor) { + return CompletableFuture.runAsync( + () -> { + logger.info("Clean up the high availability data for job {}.", jobID); + try { + internalCleanupJobData(jobID); + } catch (Exception e) { + throw new CompletionException(e); + } + logger.info( + "Finished cleaning up the high availability data for job {}.", jobID); + }, + executor); } /** @@ -264,7 +275,7 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { /** * Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap) for - * the specified Job. + * the specified Job. Method implementations need to be thread-safe. * * @param jobID The identifier of the job to cleanup. * @throws Exception when do the cleanup operation on external storage. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 8df9227..3e83c2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -21,12 +21,16 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.util.concurrent.FutureUtils; import java.io.IOException; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; /** * The HighAvailabilityServices give access to all services needed for a highly-available setup. In @@ -43,7 +47,8 @@ import java.util.UUID; * <li>Naming of RPC endpoints * </ul> */ -public interface HighAvailabilityServices extends ClientHighAvailabilityServices { +public interface HighAvailabilityServices + extends ClientHighAvailabilityServices, GloballyCleanableResource { // ------------------------------------------------------------------------ // Constants @@ -239,11 +244,8 @@ public interface HighAvailabilityServices extends ClientHighAvailabilityServices */ void closeAndCleanupAllData() throws Exception; - /** - * Deletes all data for specified job stored by these services in external stores. - * - * @param jobID The identifier of the job to cleanup. - * @throws Exception Thrown, if an exception occurred while cleaning data stored by them. - */ - default void cleanupJobData(JobID jobID) throws Exception {} + @Override + default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { + return FutureUtils.completedVoidFuture(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 7e057f1..dec9318 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -160,7 +160,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { jobResultStore = new SingleJobResultStore(jobId, clearedJobLatch); highAvailabilityServices.setJobResultStore(jobResultStore); cleanupJobHADataFuture = new CompletableFuture<>(); - highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture); + highAvailabilityServices.setGlobalCleanupFuture(cleanupJobHADataFuture); storedHABlobFuture = new CompletableFuture<>(); deleteAllHABlobsFuture = new CompletableFuture<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index e830a35..2a14207 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -746,7 +746,7 @@ public class DispatcherTest extends AbstractDispatcherTest { // Track cleanup - ha-services final CompletableFuture<JobID> cleanupJobData = new CompletableFuture<>(); - haServices.setCleanupJobDataFuture(cleanupJobData); + haServices.setGlobalCleanupFuture(cleanupJobData); cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES)); // Track cleanup - job-graph @@ -1156,7 +1156,7 @@ public class DispatcherTest extends AbstractDispatcherTest { // Track cleanup - ha-services final CompletableFuture<JobID> cleanupJobData = new CompletableFuture<>(); - haServices.setCleanupJobDataFuture(cleanupJobData); + haServices.setGlobalCleanupFuture(cleanupJobData); cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES)); // Track cleanup - job-graph diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index a105cf8..eb97d4f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -31,6 +31,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.util.function.ThrowingConsumer; import org.junit.Test; @@ -40,7 +41,6 @@ import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; @@ -130,7 +130,7 @@ public class AbstractHaServicesTest extends TestLogger { () -> {}, jobCleanupFuture::complete); - haServices.cleanupJobData(jobID); + haServices.globalCleanupAsync(jobID, Executors.directExecutor()).join(); JobID jobIDCleaned = jobCleanupFuture.get(); assertThat(jobIDCleaned, is(jobID)); } @@ -185,7 +185,7 @@ public class AbstractHaServicesTest extends TestLogger { private final Queue<? super CloseOperations> closeOperations; private final RunnableWithException internalCleanupRunnable; - private final Consumer<JobID> internalJobCleanupConsumer; + private final ThrowingConsumer<JobID, Exception> internalJobCleanupConsumer; private TestingHaServices( Configuration config, @@ -193,7 +193,7 @@ public class AbstractHaServicesTest extends TestLogger { BlobStoreService blobStoreService, Queue<? super CloseOperations> closeOperations, RunnableWithException internalCleanupRunnable, - Consumer<JobID> internalJobCleanupConsumer) { + ThrowingConsumer<JobID, Exception> internalJobCleanupConsumer) { super( config, ioExecutor, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index d98150a..75503f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -26,11 +26,12 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResul import org.apache.flink.runtime.jobmanager.JobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.util.concurrent.FutureUtils; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.function.Function; /** @@ -73,7 +74,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private CompletableFuture<Void> closeAndCleanupAllDataFuture = new CompletableFuture<>(); - private volatile CompletableFuture<JobID> jobCleanupFuture; + private volatile CompletableFuture<JobID> globalCleanupFuture; // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -148,8 +149,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture; } - public void setCleanupJobDataFuture(CompletableFuture<JobID> jobCleanupFuture) { - this.jobCleanupFuture = jobCleanupFuture; + public void setGlobalCleanupFuture(CompletableFuture<JobID> globalCleanupFuture) { + this.globalCleanupFuture = globalCleanupFuture; } // ------------------------------------------------------------------------ @@ -286,7 +287,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public void cleanupJobData(JobID jobID) { - Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID)); + public CompletableFuture<Void> globalCleanupAsync(JobID jobID, Executor executor) { + if (globalCleanupFuture != null) { + globalCleanupFuture.complete(jobID); + } + + return FutureUtils.completedVoidFuture(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java index 770984c..54165f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java @@ -173,7 +173,7 @@ public class ZooKeeperHaServicesTest extends TestLogger { haServices -> { final List<String> childrenBefore = client.getChildren().forPath(path); - haServices.cleanupJobData(jobID); + haServices.globalCleanupAsync(jobID, Executors.directExecutor()).join(); final List<String> childrenAfter = client.getChildren().forPath(path);