This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93b042da45a0bbf418e7df96059fe36eb4f18b9e
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Sep 23 22:05:28 2018 +0200

    [FLINK-10402] Port AbstractTaskManagerProcessFailureRecoveryTest to new 
code base
    
    This closes #6750.
---
 .../runtime/entrypoint/ClusterEntrypoint.java      |   4 +-
 ...tractTaskManagerProcessFailureRecoveryTest.java | 134 ++++-----------------
 ...skManagerProcessFailureBatchRecoveryITCase.java |   4 +-
 ...nagerProcessFailureStreamingRecoveryITCase.java |   2 -
 4 files changed, 28 insertions(+), 116 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index c9a1722..5665500 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -147,7 +147,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                return terminationFuture;
        }
 
-       protected void startCluster() throws ClusterEntrypointException {
+       public void startCluster() throws ClusterEntrypointException {
                LOG.info("Starting {}.", getClass().getSimpleName());
 
                try {
@@ -392,7 +392,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                return resultConfiguration;
        }
 
-       private CompletableFuture<ApplicationStatus> shutDownAsync(
+       public CompletableFuture<ApplicationStatus> shutDownAsync(
                        ApplicationStatus applicationStatus,
                        @Nullable String diagnostics,
                        boolean cleanupHaData) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 5cd6f30..0962ddf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -19,29 +19,21 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -52,17 +44,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.junit.Assert.assertFalse;
@@ -92,6 +75,9 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
        @Rule
        public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+       @Rule
+       public final BlobServerResource blobServerResource = new 
BlobServerResource();
+
        @Test
        public void testTaskManagerProcessFailure() throws Exception {
 
@@ -99,14 +85,25 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                final StringWriter processOutput2 = new StringWriter();
                final StringWriter processOutput3 = new StringWriter();
 
-               ActorSystem jmActorSystem = null;
-               HighAvailabilityServices highAvailabilityServices = null;
                Process taskManagerProcess1 = null;
                Process taskManagerProcess2 = null;
                Process taskManagerProcess3 = null;
 
                File coordinateTempDir = null;
 
+               final int jobManagerPort = NetUtils.getAvailablePort();
+               final int restPort = NetUtils.getAvailablePort();
+
+               Configuration jmConfig = new Configuration();
+               jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+               jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+               jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+               jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 
500L);
+               jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 
10000L);
+               jmConfig.setInteger(RestOptions.PORT, restPort);
+
+               final StandaloneSessionClusterEntrypoint clusterEntrypoint = 
new StandaloneSessionClusterEntrypoint(jmConfig);
+
                try {
                        // check that we run this test only if the java command
                        // is available on this machine
@@ -124,37 +121,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        // coordination between the processes goes through a 
directory
                        coordinateTempDir = temporaryFolder.newFolder();
 
-                       // find a free port to start the JobManager
-                       final int jobManagerPort = NetUtils.getAvailablePort();
-
-                       // start a JobManager
-                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
-
-                       Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
-                       jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, 
"6 s");
-                       jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
-                       
jmConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "10 s");
-                       jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-                       jmConfig.setString(JobManagerOptions.ADDRESS, 
localAddress._1());
-                       jmConfig.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               jmConfig,
-                               TestingUtils.defaultExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<>(localAddress));
-                       ActorRef jmActor = JobManager.startJobManagerActors(
-                               jmConfig,
-                               jmActorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               JobManager.class,
-                               MemoryArchivist.class)._1();
+                       clusterEntrypoint.startCluster();
 
                        // the TaskManager java command
                        String[] command = new String[] {
@@ -163,7 +130,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                        "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
                                        "-Xms80m", "-Xmx80m",
                                        "-classpath", getCurrentClasspath(),
-                                       
TaskManagerProcessEntryPoint.class.getName(),
+                                       
TaskExecutorProcessEntryPoint.class.getName(),
                                        String.valueOf(jobManagerPort)
                        };
 
@@ -173,10 +140,6 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        taskManagerProcess2 = new 
ProcessBuilder(command).start();
                        new 
CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), 
processOutput2);
 
-                       // we wait for the JobManager to have the two 
TaskManagers available
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 2, 
120000);
-
                        // the program will set a marker file in each of its 
parallel tasks once they are ready, so that
                        // this coordinating code is aware of this.
                        // the program will very slowly consume elements until 
the marker file (later created by the
@@ -189,7 +152,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                                @Override
                                public void run() {
                                        try {
-                                               
testTaskManagerFailure(jobManagerPort, coordinateDirClosure);
+                                               
testTaskManagerFailure(restPort, coordinateDirClosure);
                                        }
                                        catch (Throwable t) {
                                                t.printStackTrace();
@@ -220,10 +183,6 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        taskManagerProcess3 = new 
ProcessBuilder(command).start();
                        new 
CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), 
processOutput3);
 
-                       // we wait for the third TaskManager to register
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 3, 
120000);
-
                        // kill one of the previous TaskManagers, triggering a 
failure and recovery
                        taskManagerProcess1.destroy();
                        taskManagerProcess1 = null;
@@ -270,13 +229,8 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        if (taskManagerProcess3 != null) {
                                taskManagerProcess3.destroy();
                        }
-                       if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
-                       }
 
-                       if (highAvailabilityServices != null) {
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
+                       
clusterEntrypoint.shutDownAsync(ApplicationStatus.SUCCEEDED, null, true).get();
                }
        }
 
@@ -290,44 +244,6 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
         */
        public abstract void testTaskManagerFailure(int jobManagerPort, File 
coordinateDir) throws Exception;
 
-       protected void waitUntilNumTaskManagersAreRegistered(ActorRef 
jobManager, int numExpected, long maxDelayMillis)
-                       throws Exception {
-               final long pollInterval = 10_000_000; // 10 ms = 10,000,000 
nanos
-               final long deadline = System.nanoTime() + maxDelayMillis * 
1_000_000;
-
-               long time;
-
-               while ((time = System.nanoTime()) < deadline) {
-                       FiniteDuration timeout = new 
FiniteDuration(pollInterval, TimeUnit.NANOSECONDS);
-
-                       try {
-                               Future<?> result = Patterns.ask(jobManager,
-                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               new Timeout(timeout));
-
-                               int numTMs = (Integer) Await.result(result, 
timeout);
-
-                               if (numTMs == numExpected) {
-                                       return;
-                               }
-                       }
-                       catch (TimeoutException e) {
-                               // ignore and retry
-                       }
-                       catch (ClassCastException e) {
-                               fail("Wrong response: " + e.getMessage());
-                       }
-
-                       long timePassed = System.nanoTime() - time;
-                       long remainingMillis = (pollInterval - timePassed) / 
1_000_000;
-                       if (remainingMillis > 0) {
-                               Thread.sleep(remainingMillis);
-                       }
-               }
-
-               fail("The TaskManagers did not register within the expected 
time (" + maxDelayMillis + "msecs)");
-       }
-
        protected static void printProcessLog(String processName, String log) {
                if (log == null || log.length() == 0) {
                        return;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
index 7dc6f0c..4815c49 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -68,10 +67,9 @@ public class TaskManagerProcessFailureBatchRecoveryITCase 
extends AbstractTaskMa
        public void testTaskManagerFailure(int jobManagerPort, final File 
coordinateDir) throws Exception {
 
                final Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, 
configuration);
                env.setParallelism(PARALLELISM);
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10000));
+               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
0L));
                env.getConfig().setExecutionMode(executionMode);
                env.getConfig().disableSysoutLogging();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
index 766a799..fbf6b5b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -67,7 +66,6 @@ public class TaskManagerProcessFailureStreamingRecoveryITCase 
extends AbstractTa
                final File tempCheckpointDir = tempFolder.newFolder();
 
                final Configuration configuration = new Configuration();
-               configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
                        "localhost",
                        jobManagerPort,

Reply via email to