This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b936acb7be50ccc9fed0b32fe4bc0a9185748604
Author: Matthias Pohl <matth...@ververica.com>
AuthorDate: Wed Dec 15 12:40:09 2021 +0100

    [FLINK-25432][runtime] Makes HighAvailabilityServices implement 
LocallyCleanableResource and GloballyCleanableResource
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java |  2 +-
 .../highavailability/AbstractHaServices.java        | 21 ++++++++++++++++-----
 .../highavailability/HighAvailabilityServices.java  | 18 ++++++++++--------
 .../dispatcher/DispatcherResourceCleanupTest.java   |  2 +-
 .../flink/runtime/dispatcher/DispatcherTest.java    |  4 ++--
 .../highavailability/AbstractHaServicesTest.java    |  8 ++++----
 .../TestingHighAvailabilityServices.java            | 17 +++++++++++------
 .../zookeeper/ZooKeeperHaServicesTest.java          |  2 +-
 8 files changed, 46 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index fecba4b..92fe36b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -905,7 +905,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         jobManagerMetricGroup.removeJob(jobId);
         if (jobGraphRemoved) {
             try {
-                highAvailabilityServices.cleanupJobData(jobId);
+                highAvailabilityServices.globalCleanup(jobId);
             } catch (Exception e) {
                 log.warn(
                         "Could not properly clean data for job {} stored by ha 
services", jobId, e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index d6c0418..e8e83d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -31,6 +31,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -206,10 +208,19 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
     }
 
     @Override
-    public void cleanupJobData(JobID jobID) throws Exception {
-        logger.info("Clean up the high availability data for job {}.", jobID);
-        internalCleanupJobData(jobID);
-        logger.info("Finished cleaning up the high availability data for job 
{}.", jobID);
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobID, Executor 
executor) {
+        return CompletableFuture.runAsync(
+                () -> {
+                    logger.info("Clean up the high availability data for job 
{}.", jobID);
+                    try {
+                        internalCleanupJobData(jobID);
+                    } catch (Exception e) {
+                        throw new CompletionException(e);
+                    }
+                    logger.info(
+                            "Finished cleaning up the high availability data 
for job {}.", jobID);
+                },
+                executor);
     }
 
     /**
@@ -264,7 +275,7 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
 
     /**
      * Clean up the meta data in the distributed system(e.g. Zookeeper, 
Kubernetes ConfigMap) for
-     * the specified Job.
+     * the specified Job. Method implementations need to be thread-safe.
      *
      * @param jobID The identifier of the job to cleanup.
      * @throws Exception when do the cleanup operation on external storage.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 8df9227..3e83c2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -21,12 +21,16 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * The HighAvailabilityServices give access to all services needed for a 
highly-available setup. In
@@ -43,7 +47,8 @@ import java.util.UUID;
  *   <li>Naming of RPC endpoints
  * </ul>
  */
-public interface HighAvailabilityServices extends 
ClientHighAvailabilityServices {
+public interface HighAvailabilityServices
+        extends ClientHighAvailabilityServices, GloballyCleanableResource {
 
     // ------------------------------------------------------------------------
     //  Constants
@@ -239,11 +244,8 @@ public interface HighAvailabilityServices extends 
ClientHighAvailabilityServices
      */
     void closeAndCleanupAllData() throws Exception;
 
-    /**
-     * Deletes all data for specified job stored by these services in external 
stores.
-     *
-     * @param jobID The identifier of the job to cleanup.
-     * @throws Exception Thrown, if an exception occurred while cleaning data 
stored by them.
-     */
-    default void cleanupJobData(JobID jobID) throws Exception {}
+    @Override
+    default CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor 
executor) {
+        return FutureUtils.completedVoidFuture();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index 7e057f1..dec9318 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -160,7 +160,7 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
         jobResultStore = new SingleJobResultStore(jobId, clearedJobLatch);
         highAvailabilityServices.setJobResultStore(jobResultStore);
         cleanupJobHADataFuture = new CompletableFuture<>();
-        
highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture);
+        
highAvailabilityServices.setGlobalCleanupFuture(cleanupJobHADataFuture);
 
         storedHABlobFuture = new CompletableFuture<>();
         deleteAllHABlobsFuture = new CompletableFuture<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index e830a35..2a14207 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -746,7 +746,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
 
         // Track cleanup - ha-services
         final CompletableFuture<JobID> cleanupJobData = new 
CompletableFuture<>();
-        haServices.setCleanupJobDataFuture(cleanupJobData);
+        haServices.setGlobalCleanupFuture(cleanupJobData);
         cleanupJobData.thenAccept(jobId -> 
cleanUpEvents.add(CLEANUP_HA_SERVICES));
 
         // Track cleanup - job-graph
@@ -1156,7 +1156,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
         // Track cleanup - ha-services
         final CompletableFuture<JobID> cleanupJobData = new 
CompletableFuture<>();
-        haServices.setCleanupJobDataFuture(cleanupJobData);
+        haServices.setGlobalCleanupFuture(cleanupJobData);
         cleanupJobData.thenAccept(jobId -> 
cleanUpEvents.add(CLEANUP_HA_SERVICES));
 
         // Track cleanup - job-graph
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index a105cf8..eb97d4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.Test;
 
@@ -40,7 +41,6 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
@@ -130,7 +130,7 @@ public class AbstractHaServicesTest extends TestLogger {
                         () -> {},
                         jobCleanupFuture::complete);
 
-        haServices.cleanupJobData(jobID);
+        haServices.globalCleanupAsync(jobID, 
Executors.directExecutor()).join();
         JobID jobIDCleaned = jobCleanupFuture.get();
         assertThat(jobIDCleaned, is(jobID));
     }
@@ -185,7 +185,7 @@ public class AbstractHaServicesTest extends TestLogger {
 
         private final Queue<? super CloseOperations> closeOperations;
         private final RunnableWithException internalCleanupRunnable;
-        private final Consumer<JobID> internalJobCleanupConsumer;
+        private final ThrowingConsumer<JobID, Exception> 
internalJobCleanupConsumer;
 
         private TestingHaServices(
                 Configuration config,
@@ -193,7 +193,7 @@ public class AbstractHaServicesTest extends TestLogger {
                 BlobStoreService blobStoreService,
                 Queue<? super CloseOperations> closeOperations,
                 RunnableWithException internalCleanupRunnable,
-                Consumer<JobID> internalJobCleanupConsumer) {
+                ThrowingConsumer<JobID, Exception> internalJobCleanupConsumer) 
{
             super(
                     config,
                     ioExecutor,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index d98150a..75503f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -26,11 +26,12 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResul
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.function.Function;
 
 /**
@@ -73,7 +74,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
     private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
 
-    private volatile CompletableFuture<JobID> jobCleanupFuture;
+    private volatile CompletableFuture<JobID> globalCleanupFuture;
 
     // ------------------------------------------------------------------------
     //  Setters for mock / testing implementations
@@ -148,8 +149,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
         this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
     }
 
-    public void setCleanupJobDataFuture(CompletableFuture<JobID> 
jobCleanupFuture) {
-        this.jobCleanupFuture = jobCleanupFuture;
+    public void setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
+        this.globalCleanupFuture = globalCleanupFuture;
     }
 
     // ------------------------------------------------------------------------
@@ -286,7 +287,11 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
     }
 
     @Override
-    public void cleanupJobData(JobID jobID) {
-        Optional.ofNullable(jobCleanupFuture).ifPresent(f -> 
f.complete(jobID));
+    public CompletableFuture<Void> globalCleanupAsync(JobID jobID, Executor 
executor) {
+        if (globalCleanupFuture != null) {
+            globalCleanupFuture.complete(jobID);
+        }
+
+        return FutureUtils.completedVoidFuture();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
index 770984c..54165f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
@@ -173,7 +173,7 @@ public class ZooKeeperHaServicesTest extends TestLogger {
                 haServices -> {
                     final List<String> childrenBefore = 
client.getChildren().forPath(path);
 
-                    haServices.cleanupJobData(jobID);
+                    haServices.globalCleanupAsync(jobID, 
Executors.directExecutor()).join();
 
                     final List<String> childrenAfter = 
client.getChildren().forPath(path);
 

Reply via email to