This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 548d70aa50a43b9a8c7c901bdefbb75b85959904 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Mon Jul 12 09:44:05 2021 +0200 [FLINK-18783] RpcSystem extends AutoCloseable --- .../src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java | 6 +++++- .../apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 9 +++++++-- .../org/apache/flink/runtime/minicluster/MiniCluster.java | 11 ++++++++++- .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 9 +++++++-- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java index 3bc19bd..adc31c6 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java @@ -27,7 +27,7 @@ import java.util.ServiceLoader; * This interface serves as a factory interface for RPC services, with some additional utilities * that are reliant on implementation details of the RPC service. */ -public interface RpcSystem extends RpcSystemUtils { +public interface RpcSystem extends RpcSystemUtils, AutoCloseable { /** * Returns a builder for an {@link RpcService} that is only reachable from the local machine. @@ -51,6 +51,10 @@ public interface RpcSystem extends RpcSystemUtils { @Nullable String externalAddress, String externalPortRange); + /** Hook to cleanup resources, like common thread pools or classloaders. */ + @Override + default void close() {} + /** Builder for {@link RpcService}. */ interface RpcServiceBuilder { RpcServiceBuilder withComponentName(String name); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index c0d3063..8b24dfa 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -150,6 +150,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro private ExecutionGraphInfoStore executionGraphInfoStore; private final Thread shutDownHook; + private RpcSystem rpcSystem; protected ClusterEntrypoint(Configuration configuration) { this.configuration = generateClusterConfiguration(configuration); @@ -293,7 +294,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro LOG.info("Initializing cluster services."); synchronized (lock) { - final RpcSystem rpcSystem = RpcSystem.load(); + rpcSystem = RpcSystem.load(); commonRpcService = RpcUtils.createRemoteRpcService( @@ -499,8 +500,12 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro FutureUtils.composeAfterwards( shutDownApplicationFuture, () -> stopClusterServices(cleanupHaData)); + final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture = + FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close); + final CompletableFuture<Void> cleanupDirectoriesFuture = - FutureUtils.runAfterwards(serviceShutdownFuture, this::cleanupDirectories); + FutureUtils.runAfterwards( + rpcSystemClassLoaderCloseFuture, this::cleanupDirectories); cleanupDirectoriesFuture.whenComplete( (Void ignored2, Throwable serviceThrowable) -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 3f884c8..c9f24d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -197,6 +197,9 @@ public class MiniCluster implements AutoCloseableAsync { /** Flag marking the mini cluster as started/running. */ private volatile boolean running; + @GuardedBy("lock") + private RpcSystem rpcSystem; + // ------------------------------------------------------------------------ /** @@ -273,7 +276,7 @@ public class MiniCluster implements AutoCloseableAsync { try { initializeIOFormatClasses(configuration); - final RpcSystem rpcSystem = RpcSystem.load(); + rpcSystem = RpcSystem.load(); LOG.info("Starting Metrics Registry"); metricRegistry = @@ -1064,6 +1067,12 @@ public class MiniCluster implements AutoCloseableAsync { haServices = null; } + try { + rpcSystem.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + if (exception != null) { throw exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index d06ce6f..aaf7aa1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -128,6 +128,8 @@ public class TaskManagerRunner implements FatalErrorHandler { private final CompletableFuture<Result> terminationFuture; + private final RpcSystem rpcSystem; + private boolean shutdown; public TaskManagerRunner( @@ -137,7 +139,7 @@ public class TaskManagerRunner implements FatalErrorHandler { throws Exception { this.configuration = checkNotNull(configuration); - final RpcSystem rpcSystem = RpcSystem.load(); + rpcSystem = RpcSystem.load(); timeout = Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); @@ -252,7 +254,10 @@ public class TaskManagerRunner implements FatalErrorHandler { FutureUtils.composeAfterwards( taskManagerTerminationFuture, this::shutDownServices); - serviceTerminationFuture.whenComplete( + final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture = + FutureUtils.runAfterwards(serviceTerminationFuture, rpcSystem::close); + + rpcSystemClassLoaderCloseFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { terminationFuture.completeExceptionally(throwable);