This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2943006aec1febcfd1c33eef1b1f44367019a98
Author: wangyang0918 <danrtsey...@alibaba-inc.com>
AuthorDate: Thu Apr 1 17:24:50 2021 +0800

    [FLINK-21008][coordination] Register ClusterEntrypoint#closeAsync as 
shutdown hook for the cleanup
    
    This closes #15396.
---
 .../runtime/entrypoint/ClusterEntrypoint.java      |  43 ++-
 .../DispatcherResourceManagerComponent.java        |  28 +-
 .../runtime/entrypoint/ClusterEntrypointTest.java  | 387 ++++++++++++++++++++-
 .../TestingHighAvailabilityServices.java           |  18 +-
 .../TestingHighAvailabilityServicesBuilder.java    |  21 ++
 .../flink/runtime/testutils/TestJvmProcess.java    |  19 +
 .../testutils/TestingClusterEntrypointProcess.java | 136 ++++++++
 .../flink/runtime/util/BlockingShutdownTest.java   |  11 +-
 .../recovery/ProcessFailureCancelingITCase.java    |   2 +-
 9 files changed, 625 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index aab1c7a..69b090d 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -165,7 +165,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 
         shutDownHook =
                 ShutdownHookUtil.addShutdownHook(
-                        this::cleanupDirectories, getClass().getSimpleName(), 
LOG);
+                        () -> this.closeAsync().join(), 
getClass().getSimpleName(), LOG);
     }
 
     public CompletableFuture<ApplicationStatus> getTerminationFuture() {
@@ -198,6 +198,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                 // clean up any partial state
                 shutDownAsync(
                                 ApplicationStatus.FAILED,
+                                ShutdownBehaviour.STOP_APPLICATION,
                                 
ExceptionUtils.stringifyException(strippedThrowable),
                                 false)
                         .get(
@@ -266,13 +267,18 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                                 if (throwable != null) {
                                     shutDownAsync(
                                             ApplicationStatus.UNKNOWN,
+                                            ShutdownBehaviour.STOP_APPLICATION,
                                             
ExceptionUtils.stringifyException(throwable),
                                             false);
                                 } else {
                                     // This is the general shutdown path. If a 
separate more
                                     // specific shutdown was
                                     // already triggered, this will do nothing
-                                    shutDownAsync(applicationStatus, null, 
true);
+                                    shutDownAsync(
+                                            applicationStatus,
+                                            ShutdownBehaviour.STOP_APPLICATION,
+                                            null,
+                                            true);
                                 }
                             });
         }
@@ -363,10 +369,13 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 
     @Override
     public CompletableFuture<Void> closeAsync() {
+        ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);
+
         return shutDownAsync(
                         ApplicationStatus.UNKNOWN,
+                        ShutdownBehaviour.STOP_PROCESS,
                         "Cluster entrypoint has been closed externally.",
-                        true)
+                        false)
                 .thenAccept(ignored -> {});
     }
 
@@ -465,6 +474,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
 
     private CompletableFuture<ApplicationStatus> shutDownAsync(
             ApplicationStatus applicationStatus,
+            ShutdownBehaviour shutdownBehaviour,
             @Nullable String diagnostics,
             boolean cleanupHaData) {
         if (isShutDown.compareAndSet(false, true)) {
@@ -475,7 +485,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                     diagnostics);
 
             final CompletableFuture<Void> shutDownApplicationFuture =
-                    closeClusterComponent(applicationStatus, diagnostics);
+                    closeClusterComponent(applicationStatus, 
shutdownBehaviour, diagnostics);
 
             final CompletableFuture<Void> serviceShutdownFuture =
                     FutureUtils.composeAfterwards(
@@ -498,19 +508,27 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
     }
 
     /**
-     * Deregister the Flink application from the resource management system by 
signalling the {@link
-     * ResourceManager}.
+     * Close cluster components and deregister the Flink application from the 
resource management
+     * system by signalling the {@link ResourceManager}.
      *
      * @param applicationStatus to terminate the application with
+     * @param shutdownBehaviour shutdown behaviour
      * @param diagnostics additional information about the shut down, can be 
{@code null}
      * @return Future which is completed once the shut down
      */
     private CompletableFuture<Void> closeClusterComponent(
-            ApplicationStatus applicationStatus, @Nullable String diagnostics) 
{
+            ApplicationStatus applicationStatus,
+            ShutdownBehaviour shutdownBehaviour,
+            @Nullable String diagnostics) {
         synchronized (lock) {
             if (clusterComponent != null) {
-                return clusterComponent.deregisterApplicationAndClose(
-                        applicationStatus, diagnostics);
+                switch (shutdownBehaviour) {
+                    case STOP_APPLICATION:
+                        return 
clusterComponent.stopApplication(applicationStatus, diagnostics);
+                    case STOP_PROCESS:
+                    default:
+                        return clusterComponent.stopProcess();
+                }
             } else {
                 return CompletableFuture.completedFuture(null);
             }
@@ -523,8 +541,6 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
      * @throws IOException if the temporary directories could not be cleaned up
      */
     protected void cleanupDirectories() throws IOException {
-        ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);
-
         final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
 
         FileUtils.deleteDirectory(new File(webTmpDir));
@@ -615,4 +631,9 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
         /** Directly stops after the job has finished. */
         DETACHED
     }
+
+    private enum ShutdownBehaviour {
+        STOP_APPLICATION,
+        STOP_PROCESS,
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index bb4ba04..477ac9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 /**
  * Component which starts a {@link Dispatcher}, {@link ResourceManager} and 
{@link
@@ -113,23 +114,36 @@ public class DispatcherResourceManagerComponent 
implements AutoCloseableAsync {
 
     /**
      * Deregister the Flink application from the resource management system by 
signalling the {@link
-     * ResourceManager}.
+     * ResourceManager} and also stop the process.
      *
      * @param applicationStatus to terminate the application with
      * @param diagnostics additional information about the shut down, can be 
{@code null}
      * @return Future which is completed once the shut down
      */
-    public CompletableFuture<Void> deregisterApplicationAndClose(
+    public CompletableFuture<Void> stopApplication(
             final ApplicationStatus applicationStatus, final @Nullable String 
diagnostics) {
+        return internalShutdown(() -> deregisterApplication(applicationStatus, 
diagnostics));
+    }
+
+    /**
+     * Close the web monitor and cluster components. This method will not 
deregister the Flink
+     * application from the resource management and only stop the process.
+     *
+     * @return Future which is completed once the shut down
+     */
+    public CompletableFuture<Void> stopProcess() {
+        return internalShutdown(FutureUtils::completedVoidFuture);
+    }
 
+    private CompletableFuture<Void> internalShutdown(
+            final Supplier<CompletableFuture<?>> additionalShutdownAction) {
         if (isRunning.compareAndSet(true, false)) {
-            final CompletableFuture<Void> 
closeWebMonitorAndDeregisterAppFuture =
+            final CompletableFuture<Void> 
closeWebMonitorAndAdditionalShutdownActionFuture =
                     FutureUtils.composeAfterwards(
-                            webMonitorEndpoint.closeAsync(),
-                            () -> deregisterApplication(applicationStatus, 
diagnostics));
+                            webMonitorEndpoint.closeAsync(), 
additionalShutdownAction);
 
             return FutureUtils.composeAfterwards(
-                    closeWebMonitorAndDeregisterAppFuture, 
this::closeAsyncInternal);
+                    closeWebMonitorAndAdditionalShutdownActionFuture, 
this::closeAsyncInternal);
         } else {
             return terminationFuture;
         }
@@ -188,7 +202,7 @@ public class DispatcherResourceManagerComponent implements 
AutoCloseableAsync {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        return deregisterApplicationAndClose(
+        return stopApplication(
                 ApplicationStatus.CANCELED, 
"DispatcherResourceManagerComponent has been closed.");
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 12b5b97..db497bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -22,50 +22,417 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
+import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.TestingDispatcherRunner;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import 
org.apache.flink.runtime.resourcemanager.ArbitraryWorkerResourceSpecFactory;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
+import java.io.File;
 import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 /** Tests for the {@link ClusterEntrypoint}. */
-public class ClusterEntrypointTest {
+public class ClusterEntrypointTest extends TestLogger {
+
+    private static final long TIMEOUT_MS = 3000;
+
+    private Configuration flinkConfig;
+
+    @ClassRule
+    public static final TestExecutorResource<?> TEST_EXECUTOR_RESOURCE =
+            new TestExecutorResource<>(Executors::newSingleThreadExecutor);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Before
+    public void before() {
+        flinkConfig = new Configuration();
+    }
 
     @Test(expected = IllegalConfigurationException.class)
     public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
-        Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
-        new MockEntryPoint(configuration);
+        flinkConfig.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        new TestingEntryPoint.Builder().setConfiguration(flinkConfig).build();
         fail("Entrypoint initialization is supposed to fail");
     }
 
-    private static class MockEntryPoint extends ClusterEntrypoint {
+    @Test
+    public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        .setCloseFuture(closeFuture)
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(closeFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void testCloseAsyncShouldNotDeregisterApp() throws Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(deregisterFuture.isDone(), is(false));
+    }
+
+    @Test
+    public void 
testClusterFinishedNormallyShouldDeregisterAppAndCleanupHAData() throws 
Exception {
+        final CompletableFuture<Void> deregisterFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        final CompletableFuture<ApplicationStatus> dispatcherShutDownFuture =
+                new CompletableFuture<>();
+
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder()
+                        
.setCloseAndCleanupAllDataFuture(closeAndCleanupAllDataFuture)
+                        .build();
+        final TestingResourceManagerFactory testingResourceManagerFactory =
+                new TestingResourceManagerFactory.Builder()
+                        .setInternalDeregisterApplicationConsumer(
+                                (ignored1, ignored2) -> 
deregisterFuture.complete(null))
+                        .build();
+        final TestingDispatcherRunnerFactory testingDispatcherRunnerFactory =
+                new TestingDispatcherRunnerFactory.Builder()
+                        .setShutDownFuture(dispatcherShutDownFuture)
+                        .build();
+
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        
.setResourceManagerFactory(testingResourceManagerFactory)
+                        
.setDispatcherRunnerFactory(testingDispatcherRunnerFactory)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        dispatcherShutDownFuture.complete(ApplicationStatus.SUCCEEDED);
 
-        protected MockEntryPoint(Configuration configuration) {
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.SUCCEEDED));
+        assertThat(deregisterFuture.isDone(), is(true));
+        assertThat(closeAndCleanupAllDataFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testCloseAsyncShouldBeExecutedInShutdownHook() throws 
Exception {
+        // This test only works on Linux and Mac OS because we are sending 
SIGTERM to a
+        // JAVA process via "kill {pid}"
+        assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());
+        final File markerFile = new File(TEMPORARY_FOLDER.getRoot(), 
UUID.randomUUID() + ".marker");
+        final TestingClusterEntrypointProcess clusterEntrypointProcess =
+                new TestingClusterEntrypointProcess(markerFile);
+        try {
+            clusterEntrypointProcess.startProcess();
+            final long pid = clusterEntrypointProcess.getProcessId();
+            assertTrue("Cannot determine process ID", pid != -1);
+
+            // wait for the marker file to appear, which means the process is 
up properly
+            TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+            TestJvmProcess.killProcessWithSigTerm(pid);
+
+            final boolean exited =
+                    clusterEntrypointProcess.waitFor(TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            assertThat(
+                    String.format("Process %s does not exit within %s ms", 
pid, TIMEOUT_MS),
+                    exited,
+                    is(true));
+            assertThat(
+                    "markerFile should be deleted in closeAsync shutdownHook",
+                    markerFile.exists(),
+                    is(false));
+        } finally {
+            clusterEntrypointProcess.destroy();
+        }
+    }
+
+    private CompletableFuture<ApplicationStatus> startClusterEntrypoint(
+            TestingEntryPoint testingEntryPoint) throws Exception {
+        testingEntryPoint.startCluster();
+        return FutureUtils.supplyAsync(
+                () -> testingEntryPoint.getTerminationFuture().get(),
+                TEST_EXECUTOR_RESOURCE.getExecutor());
+    }
+
+    private static class TestingEntryPoint extends ClusterEntrypoint {
+
+        private final HighAvailabilityServices haService;
+
+        private final ResourceManagerFactory<ResourceID> 
resourceManagerFactory;
+
+        private final DispatcherRunnerFactory dispatcherRunnerFactory;
+
+        private TestingEntryPoint(
+                Configuration configuration,
+                HighAvailabilityServices haService,
+                ResourceManagerFactory<ResourceID> resourceManagerFactory,
+                DispatcherRunnerFactory dispatcherRunnerFactory) {
             super(configuration);
+            SignalHandler.register(LOG);
+            this.haService = haService;
+            this.resourceManagerFactory = resourceManagerFactory;
+            this.dispatcherRunnerFactory = dispatcherRunnerFactory;
         }
 
         @Override
         protected DispatcherResourceManagerComponentFactory
                 createDispatcherResourceManagerComponentFactory(Configuration 
configuration)
                         throws IOException {
-            throw new UnsupportedOperationException("Not needed for this 
test");
+            return new DefaultDispatcherResourceManagerComponentFactory(
+                    dispatcherRunnerFactory,
+                    resourceManagerFactory,
+                    SessionRestEndpointFactory.INSTANCE);
         }
 
         @Override
         protected ExecutionGraphInfoStore 
createSerializableExecutionGraphStore(
-                Configuration configuration, ScheduledExecutor 
scheduledExecutor)
-                throws IOException {
-            throw new UnsupportedOperationException("Not needed for this 
test");
+                Configuration configuration, ScheduledExecutor 
scheduledExecutor) {
+            return new MemoryExecutionGraphInfoStore();
+        }
+
+        @Override
+        protected HighAvailabilityServices createHaServices(
+                Configuration configuration, Executor executor) {
+            return haService;
         }
 
         @Override
         protected boolean supportsReactiveMode() {
             return false;
         }
+
+        public static final class Builder {
+            private HighAvailabilityServices haService =
+                    new TestingHighAvailabilityServicesBuilder().build();
+
+            private ResourceManagerFactory<ResourceID> resourceManagerFactory =
+                    StandaloneResourceManagerFactory.getInstance();
+
+            private DispatcherRunnerFactory dispatcherRunnerFactory =
+                    new TestingDispatcherRunnerFactory.Builder().build();
+
+            private Configuration configuration = new Configuration();
+
+            public Builder 
setHighAvailabilityServices(HighAvailabilityServices haService) {
+                this.haService = haService;
+                return this;
+            }
+
+            public Builder setResourceManagerFactory(
+                    ResourceManagerFactory<ResourceID> resourceManagerFactory) 
{
+                this.resourceManagerFactory = resourceManagerFactory;
+                return this;
+            }
+
+            public Builder setConfiguration(Configuration configuration) {
+                this.configuration = configuration;
+                return this;
+            }
+
+            public Builder setDispatcherRunnerFactory(
+                    DispatcherRunnerFactory dispatcherRunnerFactory) {
+                this.dispatcherRunnerFactory = dispatcherRunnerFactory;
+                return this;
+            }
+
+            public TestingEntryPoint build() {
+                return new TestingEntryPoint(
+                        configuration, haService, resourceManagerFactory, 
dispatcherRunnerFactory);
+            }
+        }
+    }
+
+    private static class TestingDispatcherRunnerFactory implements 
DispatcherRunnerFactory {
+
+        private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+        private TestingDispatcherRunnerFactory(
+                CompletableFuture<ApplicationStatus> shutDownFuture) {
+            this.shutDownFuture = shutDownFuture;
+        }
+
+        @Override
+        public DispatcherRunner createDispatcherRunner(
+                LeaderElectionService leaderElectionService,
+                FatalErrorHandler fatalErrorHandler,
+                JobGraphStoreFactory jobGraphStoreFactory,
+                Executor ioExecutor,
+                RpcService rpcService,
+                PartialDispatcherServices partialDispatcherServices)
+                throws Exception {
+            return 
TestingDispatcherRunner.newBuilder().setShutDownFuture(shutDownFuture).build();
+        }
+
+        public static final class Builder {
+            private CompletableFuture<ApplicationStatus> shutDownFuture = new 
CompletableFuture<>();
+
+            public Builder 
setShutDownFuture(CompletableFuture<ApplicationStatus> shutDownFuture) {
+                this.shutDownFuture = shutDownFuture;
+                return this;
+            }
+
+            public TestingDispatcherRunnerFactory build() {
+                return new TestingDispatcherRunnerFactory(shutDownFuture);
+            }
+        }
+    }
+
+    private static class TestingResourceManagerFactory extends 
ResourceManagerFactory<ResourceID> {
+
+        private final BiConsumer<ApplicationStatus, String> 
deregisterAppConsumer;
+
+        private TestingResourceManagerFactory(
+                BiConsumer<ApplicationStatus, String> deregisterAppConsumer) {
+            this.deregisterAppConsumer = deregisterAppConsumer;
+        }
+
+        @Override
+        protected ResourceManager<ResourceID> createResourceManager(
+                Configuration configuration,
+                ResourceID resourceId,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                FatalErrorHandler fatalErrorHandler,
+                ClusterInformation clusterInformation,
+                @Nullable String webInterfaceUrl,
+                ResourceManagerMetricGroup resourceManagerMetricGroup,
+                ResourceManagerRuntimeServices resourceManagerRuntimeServices,
+                Executor ioExecutor)
+                throws Exception {
+            final SlotManager slotManager =
+                    SlotManagerBuilder.newBuilder()
+                            
.setScheduledExecutor(rpcService.getScheduledExecutor())
+                            .build();
+            final JobLeaderIdService jobLeaderIdService =
+                    new TestingJobLeaderIdService.Builder().build();
+            return new TestingResourceManager(
+                    rpcService,
+                    resourceId,
+                    highAvailabilityServices,
+                    heartbeatServices,
+                    slotManager,
+                    NoOpResourceManagerPartitionTracker::get,
+                    jobLeaderIdService,
+                    fatalErrorHandler,
+                    resourceManagerMetricGroup) {
+                @Override
+                protected void internalDeregisterApplication(
+                        ApplicationStatus finalStatus, @Nullable String 
diagnostics) {
+                    deregisterAppConsumer.accept(finalStatus, diagnostics);
+                }
+            };
+        }
+
+        @Override
+        protected ResourceManagerRuntimeServicesConfiguration
+                
createResourceManagerRuntimeServicesConfiguration(Configuration configuration)
+                        throws ConfigurationException {
+            return 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(
+                    configuration, 
ArbitraryWorkerResourceSpecFactory.INSTANCE);
+        }
+
+        public static final class Builder {
+            private BiConsumer<ApplicationStatus, String> 
deregisterAppConsumer =
+                    (ignore1, ignore2) -> {};
+
+            public Builder setInternalDeregisterApplicationConsumer(
+                    BiConsumer<ApplicationStatus, String> biConsumer) {
+                this.deregisterAppConsumer = biConsumer;
+                return this;
+            }
+
+            public TestingResourceManagerFactory build() {
+                return new 
TestingResourceManagerFactory(deregisterAppConsumer);
+            }
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index d805f4e..0afc981 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
@@ -67,6 +68,10 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
     private volatile RunningJobsRegistry runningJobsRegistry = new 
StandaloneRunningJobsRegistry();
 
+    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+
     // ------------------------------------------------------------------------
     //  Setters for mock / testing implementations
     // ------------------------------------------------------------------------
@@ -131,6 +136,15 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
         this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
     }
 
+    public void setCloseFuture(CompletableFuture<Void> closeFuture) {
+        this.closeFuture = closeFuture;
+    }
+
+    public void setCloseAndCleanupAllDataFuture(
+            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+    }
+
     // ------------------------------------------------------------------------
     //  HA Services Methods
     // ------------------------------------------------------------------------
@@ -256,11 +270,11 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
     @Override
     public void close() throws Exception {
-        // nothing to do
+        closeFuture.complete(null);
     }
 
     @Override
     public void closeAndCleanupAllData() throws Exception {
-        // nothing to do
+        closeAndCleanupAllDataFuture.complete(null);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
index 64e2b07..dbf5269 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServicesBuilder.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 /** Builder for the {@link TestingHighAvailabilityServices}. */
@@ -70,6 +71,10 @@ public class TestingHighAvailabilityServicesBuilder {
 
     private RunningJobsRegistry runningJobsRegistry = new 
StandaloneRunningJobsRegistry();
 
+    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+
     public TestingHighAvailabilityServices build() {
         final TestingHighAvailabilityServices testingHighAvailabilityServices =
                 new TestingHighAvailabilityServices();
@@ -96,6 +101,10 @@ public class TestingHighAvailabilityServicesBuilder {
         testingHighAvailabilityServices.setJobGraphStore(jobGraphStore);
         
testingHighAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
 
+        testingHighAvailabilityServices.setCloseFuture(closeFuture);
+        testingHighAvailabilityServices.setCloseAndCleanupAllDataFuture(
+                closeAndCleanupAllDataFuture);
+
         return testingHighAvailabilityServices;
     }
 
@@ -163,4 +172,16 @@ public class TestingHighAvailabilityServicesBuilder {
         this.runningJobsRegistry = runningJobsRegistry;
         return this;
     }
+
+    public TestingHighAvailabilityServicesBuilder setCloseFuture(
+            CompletableFuture<Void> closeFuture) {
+        this.closeFuture = closeFuture;
+        return this;
+    }
+
+    public TestingHighAvailabilityServicesBuilder 
setCloseAndCleanupAllDataFuture(
+            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+        return this;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 65e2c33..8ecf6db 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -31,6 +31,7 @@ import java.io.StringWriter;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
@@ -288,6 +289,15 @@ public abstract class TestJvmProcess {
         }
     }
 
+    public boolean waitFor(long timeout, TimeUnit unit) throws 
InterruptedException {
+        final Process process = this.process;
+        if (process != null) {
+            return process.waitFor(timeout, unit);
+        } else {
+            throw new IllegalStateException("process not started");
+        }
+    }
+
     public int exitCode() {
         Process process = this.process;
         if (process != null) {
@@ -324,6 +334,15 @@ public abstract class TestJvmProcess {
         }
     }
 
+    public static void killProcessWithSigTerm(long pid) throws Exception {
+        // send it a regular kill command (SIG_TERM)
+        final Process kill = Runtime.getRuntime().exec("kill " + pid);
+        kill.waitFor();
+        if (kill.exitValue() != 0) {
+            fail("failed to send SIG_TERM to process " + pid);
+        }
+    }
+
     public static void waitForMarkerFiles(File basedir, String prefix, int 
num, long timeout) {
         long now = System.currentTimeMillis();
         final long deadline = now + timeout;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
new file mode 100644
index 0000000..9591e35
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingClusterEntrypointProcess.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
+import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A testing {@link ClusterEntrypoint} instance running in a separate JVM. */
+public class TestingClusterEntrypointProcess extends TestJvmProcess {
+
+    private final File markerFile;
+
+    public TestingClusterEntrypointProcess(File markerFile) throws Exception {
+        this.markerFile = checkNotNull(markerFile, "marker file");
+    }
+
+    @Override
+    public String getName() {
+        return getClass().getCanonicalName();
+    }
+
+    @Override
+    public String[] getJvmArgs() {
+        return new String[] {markerFile.getAbsolutePath()};
+    }
+
+    @Override
+    public String getEntryPointClassName() {
+        return TestingClusterEntrypointProcessEntryPoint.class.getName();
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getCanonicalName();
+    }
+
+    /** Entrypoint for the testing cluster entrypoint process. */
+    public static class TestingClusterEntrypointProcessEntryPoint {
+
+        private static final Logger LOG =
+                
LoggerFactory.getLogger(TestingClusterEntrypointProcessEntryPoint.class);
+
+        public static void main(String[] args) {
+            try {
+                final File markerFile = new File(args[0]);
+
+                final Configuration config = new Configuration();
+                config.setInteger(JobManagerOptions.PORT, 0);
+                config.setString(RestOptions.BIND_PORT, "0");
+
+                final TestingClusterEntrypoint clusterEntrypoint =
+                        new TestingClusterEntrypoint(config, markerFile);
+
+                SignalHandler.register(LOG);
+                clusterEntrypoint.startCluster();
+                TestJvmProcess.touchFile(markerFile);
+                final int returnCode =
+                        
clusterEntrypoint.getTerminationFuture().get().processExitCode();
+                System.exit(returnCode);
+            } catch (Throwable t) {
+                LOG.error("Failed to start TestingClusterEntrypoint process", 
t);
+                System.exit(1);
+            }
+        }
+    }
+
+    private static class TestingClusterEntrypoint extends ClusterEntrypoint {
+
+        private final File markerFile;
+
+        protected TestingClusterEntrypoint(Configuration configuration, File 
markerFile) {
+            super(configuration);
+            this.markerFile = markerFile;
+        }
+
+        @Override
+        protected DispatcherResourceManagerComponentFactory
+                createDispatcherResourceManagerComponentFactory(Configuration 
configuration)
+                        throws IOException {
+            return 
DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
+                    StandaloneResourceManagerFactory.getInstance());
+        }
+
+        @Override
+        protected ExecutionGraphInfoStore 
createSerializableExecutionGraphStore(
+                Configuration configuration, ScheduledExecutor 
scheduledExecutor)
+                throws IOException {
+            return new MemoryExecutionGraphInfoStore();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            return super.closeAsync()
+                    .thenRun(
+                            () -> {
+                                LOG.info("Deleting markerFile {}", markerFile);
+                                IOUtils.deleteFileQuietly(markerFile.toPath());
+                            });
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
index 8489a99..b4083fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.UUID;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
@@ -60,10 +59,7 @@ public class BlockingShutdownTest {
             // wait for the marker file to appear, which means the process is 
up properly
             TestJvmProcess.waitForMarkerFile(markerFile, 30000);
 
-            // send it a regular kill command (SIG_TERM)
-            Process kill = Runtime.getRuntime().exec("kill " + pid);
-            kill.waitFor();
-            assertEquals("failed to send SIG_TERM to process", 0, 
kill.exitValue());
+            TestJvmProcess.killProcessWithSigTerm(pid);
 
             // minimal delay until the Java process object notices that the 
process is gone
             // this will not let the test fail predictably if the process is 
actually in fact going
@@ -105,10 +101,7 @@ public class BlockingShutdownTest {
             // wait for the marker file to appear, which means the process is 
up properly
             TestJvmProcess.waitForMarkerFile(markerFile, 30000);
 
-            // send it a regular kill command (SIG_TERM)
-            Process kill = Runtime.getRuntime().exec("kill " + pid);
-            kill.waitFor();
-            assertEquals("failed to send SIG_TERM to process", 0, 
kill.exitValue());
+            TestJvmProcess.killProcessWithSigTerm(pid);
 
             // the process should eventually go away
             final long deadline = System.nanoTime() + 30_000_000_000L; // 30 
secs in nanos
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 5c2d5b9..6923706 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -219,7 +219,7 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                 taskManagerProcess.destroy();
             }
             if (dispatcherResourceManagerComponent != null) {
-                
dispatcherResourceManagerComponent.deregisterApplicationAndClose(
+                dispatcherResourceManagerComponent.stopApplication(
                         ApplicationStatus.SUCCEEDED, null);
             }
 

Reply via email to