This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 548d70aa50a43b9a8c7c901bdefbb75b85959904
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Mon Jul 12 09:44:05 2021 +0200

    [FLINK-18783] RpcSystem extends AutoCloseable
---
 .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java |  6 +++++-
 .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java    |  9 +++++++--
 .../org/apache/flink/runtime/minicluster/MiniCluster.java     | 11 ++++++++++-
 .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java  |  9 +++++++--
 4 files changed, 29 insertions(+), 6 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
index 3bc19bd..adc31c6 100644
--- 
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
+++ 
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
@@ -27,7 +27,7 @@ import java.util.ServiceLoader;
  * This interface serves as a factory interface for RPC services, with some 
additional utilities
  * that are reliant on implementation details of the RPC service.
  */
-public interface RpcSystem extends RpcSystemUtils {
+public interface RpcSystem extends RpcSystemUtils, AutoCloseable {
 
     /**
      * Returns a builder for an {@link RpcService} that is only reachable from 
the local machine.
@@ -51,6 +51,10 @@ public interface RpcSystem extends RpcSystemUtils {
             @Nullable String externalAddress,
             String externalPortRange);
 
+    /** Hook to cleanup resources, like common thread pools or classloaders. */
+    @Override
+    default void close() {}
+
     /** Builder for {@link RpcService}. */
     interface RpcServiceBuilder {
         RpcServiceBuilder withComponentName(String name);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c0d3063..8b24dfa 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -150,6 +150,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
     private ExecutionGraphInfoStore executionGraphInfoStore;
 
     private final Thread shutDownHook;
+    private RpcSystem rpcSystem;
 
     protected ClusterEntrypoint(Configuration configuration) {
         this.configuration = generateClusterConfiguration(configuration);
@@ -293,7 +294,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
         LOG.info("Initializing cluster services.");
 
         synchronized (lock) {
-            final RpcSystem rpcSystem = RpcSystem.load();
+            rpcSystem = RpcSystem.load();
 
             commonRpcService =
                     RpcUtils.createRemoteRpcService(
@@ -499,8 +500,12 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                     FutureUtils.composeAfterwards(
                             shutDownApplicationFuture, () -> 
stopClusterServices(cleanupHaData));
 
+            final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
+                    FutureUtils.runAfterwards(serviceShutdownFuture, 
rpcSystem::close);
+
             final CompletableFuture<Void> cleanupDirectoriesFuture =
-                    FutureUtils.runAfterwards(serviceShutdownFuture, 
this::cleanupDirectories);
+                    FutureUtils.runAfterwards(
+                            rpcSystemClassLoaderCloseFuture, 
this::cleanupDirectories);
 
             cleanupDirectoriesFuture.whenComplete(
                     (Void ignored2, Throwable serviceThrowable) -> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3f884c8..c9f24d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -197,6 +197,9 @@ public class MiniCluster implements AutoCloseableAsync {
     /** Flag marking the mini cluster as started/running. */
     private volatile boolean running;
 
+    @GuardedBy("lock")
+    private RpcSystem rpcSystem;
+
     // ------------------------------------------------------------------------
 
     /**
@@ -273,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync {
             try {
                 initializeIOFormatClasses(configuration);
 
-                final RpcSystem rpcSystem = RpcSystem.load();
+                rpcSystem = RpcSystem.load();
 
                 LOG.info("Starting Metrics Registry");
                 metricRegistry =
@@ -1064,6 +1067,12 @@ public class MiniCluster implements AutoCloseableAsync {
                 haServices = null;
             }
 
+            try {
+                rpcSystem.close();
+            } catch (Exception e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+
             if (exception != null) {
                 throw exception;
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index d06ce6f..aaf7aa1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -128,6 +128,8 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 
     private final CompletableFuture<Result> terminationFuture;
 
+    private final RpcSystem rpcSystem;
+
     private boolean shutdown;
 
     public TaskManagerRunner(
@@ -137,7 +139,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
             throws Exception {
         this.configuration = checkNotNull(configuration);
 
-        final RpcSystem rpcSystem = RpcSystem.load();
+        rpcSystem = RpcSystem.load();
 
         timeout = 
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
 
@@ -252,7 +254,10 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                         FutureUtils.composeAfterwards(
                                 taskManagerTerminationFuture, 
this::shutDownServices);
 
-                serviceTerminationFuture.whenComplete(
+                final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
+                        FutureUtils.runAfterwards(serviceTerminationFuture, 
rpcSystem::close);
+
+                rpcSystemClassLoaderCloseFuture.whenComplete(
                         (Void ignored, Throwable throwable) -> {
                             if (throwable != null) {
                                 
terminationFuture.completeExceptionally(throwable);

Reply via email to