This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff97d1cb3efbac4f47abc0a38bac932a55fc1288
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Sep 23 00:28:30 2018 +0200

    [FLINK-10401] Port ProcessFailureCancelingITCase to new code base
    
    This closes #6749.
---
 .../DispatcherResourceManagerComponent.java        |  10 +
 .../flink/runtime/util/BlobServerResource.java     |   4 +
 ...tractTaskManagerProcessFailureRecoveryTest.java |  29 +++
 .../recovery/ProcessFailureCancelingITCase.java    | 256 ++++++++++-----------
 4 files changed, 159 insertions(+), 140 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index 94925b2..b07095c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -116,6 +116,16 @@ public class DispatcherResourceManagerComponent<T extends 
Dispatcher> implements
                return shutDownFuture;
        }
 
+       @Nonnull
+       public T getDispatcher() {
+               return dispatcher;
+       }
+
+       @Nonnull
+       public WebMonitorEndpoint<?> getWebMonitorEndpoint() {
+               return webMonitorEndpoint;
+       }
+
        @Override
        public CompletableFuture<Void> closeAsync() {
                if (isRunning.compareAndSet(true, false)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
index 080ecf8..654b2bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java
@@ -62,4 +62,8 @@ public class BlobServerResource extends ExternalResource {
        public int getBlobServerPort() {
                return blobServer.getPort();
        }
+
+       public BlobServer getBlobServer() {
+               return blobServer;
+       }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 56327ad..5cd6f30 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -421,4 +422,32 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                }
        }
 
+       /**
+        * The entry point for the TaskExecutor JVM. Simply configures and runs 
a TaskExecutor.
+        */
+       public static class TaskExecutorProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class);
+
+               public static void main(String[] args) {
+                       try {
+                               int jobManagerPort = Integer.parseInt(args[0]);
+
+                               Configuration cfg = new Configuration();
+                               cfg.setString(JobManagerOptions.ADDRESS, 
"localhost");
+                               cfg.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
+                               
cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+                               
cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+                               
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+                               cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+
+                               TaskManagerRunner.runTaskManager(cfg, 
ResourceID.generate());
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Failed to start TaskManager 
process", t);
+                               System.exit(1);
+                       }
+               }
+       }
+
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index b85a410..afca8f1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -21,51 +21,58 @@ package org.apache.flink.test.recovery;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import 
org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.CheckedSupplier;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.StringWriter;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This test makes sure that jobs are canceled properly in cases where
@@ -74,15 +81,36 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public class ProcessFailureCancelingITCase extends TestLogger {
 
+       @Rule
+       public final BlobServerResource blobServerResource = new 
BlobServerResource();
+
        @Test
        public void testCancelingOnProcessFailure() throws Exception {
                final StringWriter processOutput = new StringWriter();
+               final Time timeout = Time.minutes(2L);
 
-               ActorSystem jmActorSystem = null;
+               RestClusterClient<String> clusterClient = null;
                Process taskManagerProcess = null;
-               HighAvailabilityServices highAvailabilityServices = null;
+               final TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               Configuration jmConfig = new Configuration();
+               jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
+               jmConfig.setString(JobManagerOptions.ADDRESS, "localhost");
+               jmConfig.setInteger(RestOptions.PORT, 0);
+
+               final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig);
+               final int jobManagerPort = rpcService.getPort();
+               jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort);
+
+               final SessionDispatcherResourceManagerComponentFactory 
resourceManagerComponentFactory = new 
SessionDispatcherResourceManagerComponentFactory(
+                       StandaloneResourceManagerFactory.INSTANCE);
+               DispatcherResourceManagerComponent<?> 
dispatcherResourceManagerComponent = null;
+
+               try (final HighAvailabilityServices haServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
+                               jmConfig,
+                               TestingUtils.defaultExecutor(),
+                               
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) {
 
-               try {
                        // check that we run this test only if the java command
                        // is available on this machine
                        String javaCommand = getJavaCommandPath();
@@ -96,36 +124,22 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                        tempLogFile.deleteOnExit();
                        CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-                       // find a free port to start the JobManager
-                       final int jobManagerPort = NetUtils.getAvailablePort();
-
-                       // start a JobManager
-                       Tuple2<String, Object> localAddress = new 
Tuple2<String, Object>("localhost", jobManagerPort);
-
-                       Configuration jmConfig = new Configuration();
-                       
jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s");
-                       jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, 
"2000 s");
-                       jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10);
-                       jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
-                       jmConfig.setString(JobManagerOptions.ADDRESS, 
localAddress._1());
-                       jmConfig.setInteger(JobManagerOptions.PORT, 
jobManagerPort);
-
-                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               jmConfig,
-                               TestingUtils.defaultExecutor(),
-                               
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-                       jmActorSystem = AkkaUtils.createActorSystem(jmConfig, 
new Some<>(localAddress));
-                       ActorRef jmActor = JobManager.startJobManagerActors(
+                       dispatcherResourceManagerComponent = 
resourceManagerComponentFactory.create(
                                jmConfig,
-                               jmActorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
+                               rpcService,
+                               haServices,
+                               blobServerResource.getBlobServer(),
+                               new HeartbeatServices(100L, 1000L),
                                NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               JobManager.class,
-                               MemoryArchivist.class)._1();
+                               new MemoryArchivedExecutionGraphStore(),
+                               fatalErrorHandler);
+
+                       // update the rest ports
+                       final int restPort = dispatcherResourceManagerComponent
+                               .getWebMonitorEndpoint()
+                               .getServerAddress()
+                               .getPort();
+                       jmConfig.setInteger(RestOptions.PORT, restPort);
 
                        // the TaskManager java command
                        String[] command = new String[] {
@@ -134,7 +148,7 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                                        "-Dlog4j.configuration=file:" + 
tempLogFile.getAbsolutePath(),
                                        "-Xms80m", "-Xmx80m",
                                        "-classpath", getCurrentClasspath(),
-                                       
AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(),
+                                       
AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(),
                                        String.valueOf(jobManagerPort)
                        };
 
@@ -142,21 +156,14 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                        taskManagerProcess = new 
ProcessBuilder(command).start();
                        new 
CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), 
processOutput);
 
-                       // we wait for the JobManager to have the two 
TaskManagers available
-                       // since some of the CI environments are very hostile, 
we need to give this a lot of time (2 minutes)
-                       waitUntilNumTaskManagersAreRegistered(jmActor, 1, 
120000);
-
                        final Throwable[] errorRef = new Throwable[1];
 
-                       final Configuration configuration = new Configuration();
-                       configuration.setString(CoreOptions.MODE, 
CoreOptions.LEGACY_MODE);
-
                        // start the test program, which infinitely blocks
                        Runnable programRunner = new Runnable() {
                                @Override
                                public void run() {
                                        try {
-                                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, 
configuration);
+                                               ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new 
Configuration());
                                                env.setParallelism(2);
                                                
env.setRestartStrategy(RestartStrategies.noRestart());
                                                
env.getConfig().disableSysoutLogging();
@@ -187,15 +194,30 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                        Thread programThread = new Thread(programRunner);
 
                        // kill the TaskManager
+                       programThread.start();
+
+                       final LeaderConnectionInfo leaderConnectionInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(),
 Time.seconds(10L));
+
+                       final DispatcherGateway dispatcherGateway = 
rpcService.connect(
+                               leaderConnectionInfo.getAddress(),
+                               
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
+                               DispatcherGateway.class).get();
+
+                       waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
+
+                       clusterClient = new RestClusterClient<>(jmConfig, 
"standalone");
+
+                       final Collection<JobID> jobIds = 
waitForRunningJobs(clusterClient, timeout);
+
+                       assertThat(jobIds, hasSize(1));
+                       final JobID jobId = jobIds.iterator().next();
+
+                       // kill the TaskManager after the job started to run
                        taskManagerProcess.destroy();
                        taskManagerProcess = null;
 
-                       // immediately submit the job. this should hit the case
-                       // where the JobManager still thinks it has the 
TaskManager and tries to send it tasks
-                       programThread.start();
-
                        // try to cancel the job
-                       cancelRunningJob(jmActor);
+                       clusterClient.cancel(jobId);
 
                        // we should see a failure within reasonable time (10s 
is the ask timeout).
                        // since the CI environment is often slow, we 
conservatively give it up to 2 minutes,
@@ -223,88 +245,42 @@ public class ProcessFailureCancelingITCase extends 
TestLogger {
                        if (taskManagerProcess != null) {
                                taskManagerProcess.destroy();
                        }
-                       if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
-                       }
-
-                       if (highAvailabilityServices != null) {
-                               
highAvailabilityServices.closeAndCleanupAllData();
-                       }
-               }
-       }
-
-       private void cancelRunningJob(ActorRef jobManager) throws Exception {
-               final FiniteDuration askTimeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               // try at most for 30 seconds
-               final long deadline = System.currentTimeMillis() + 30000;
-
-               JobID jobId = null;
-
-               do {
-                       Future<Object> response = Patterns.ask(jobManager,
-                                       
JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
-
-                       Object result;
-                       try {
-                               result = Await.result(response, askTimeout);
+                       if (clusterClient != null) {
+                               clusterClient.shutdown();
                        }
-                       catch (Exception e) {
-                               throw new Exception("Could not retrieve running 
jobs from the JobManager.", e);
+                       if (dispatcherResourceManagerComponent != null) {
+                               dispatcherResourceManagerComponent.close();
                        }
 
-                       if (result instanceof 
JobManagerMessages.RunningJobsStatus) {
-
-                               List<JobStatusMessage> jobs = 
((JobManagerMessages.RunningJobsStatus) result).getStatusMessages();
+                       fatalErrorHandler.rethrowError();
 
-                               if (jobs.size() == 1) {
-                                       jobId = jobs.get(0).getJobId();
-                                       break;
-                               }
-                       }
-               }
-               while (System.currentTimeMillis() < deadline);
-
-               if (jobId == null) {
-                       // we never found it running, must have failed already
-                       return;
+                       RpcUtils.terminateRpcService(rpcService, 
Time.seconds(10L));
                }
-
-               // tell the JobManager to cancel the job
-               jobManager.tell(
-                       new JobManagerMessages.LeaderSessionMessage(
-                               HighAvailabilityServices.DEFAULT_LEADER_ID,
-                               new JobManagerMessages.CancelJob(jobId)),
-                       ActorRef.noSender());
        }
 
-       private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, 
int numExpected, long maxDelay)
-                       throws Exception {
-               final long deadline = System.currentTimeMillis() + maxDelay;
-               while (true) {
-                       long remaining = deadline - System.currentTimeMillis();
-                       if (remaining <= 0) {
-                               fail("The TaskManagers did not register within 
the expected time (" + maxDelay + "msecs)");
-                       }
-
-                       FiniteDuration timeout = new FiniteDuration(remaining, 
TimeUnit.MILLISECONDS);
+       private void waitUntilAllSlotsAreUsed(DispatcherGateway 
dispatcherGateway, Time timeout) throws ExecutionException, 
InterruptedException {
+               FutureUtils.retrySuccesfulWithDelay(
+                       () -> dispatcherGateway.requestClusterOverview(timeout),
+                       Time.milliseconds(50L),
+                       
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+                       clusterOverview -> 
clusterOverview.getNumTaskManagersConnected() >= 1 &&
+                               clusterOverview.getNumSlotsAvailable() == 0 &&
+                               clusterOverview.getNumSlotsTotal() == 2,
+                       TestingUtils.defaultScheduledExecutor())
+                       .get();
+       }
 
-                       try {
-                               Future<?> result = Patterns.ask(jobManager,
-                                               
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-                                               new Timeout(timeout));
-                               Integer numTMs = (Integer) Await.result(result, 
timeout);
-                               if (numTMs == numExpected) {
-                                       break;
-                               }
-                       }
-                       catch (TimeoutException e) {
-                               // ignore and retry
-                       }
-                       catch (ClassCastException e) {
-                               fail("Wrong response: " + e.getMessage());
-                       }
-               }
+       private Collection<JobID> waitForRunningJobs(ClusterClient<?> 
clusterClient, Time timeout) throws ExecutionException, InterruptedException {
+               return FutureUtils.retrySuccesfulWithDelay(
+                               
CheckedSupplier.unchecked(clusterClient::listJobs),
+                               Time.milliseconds(50L),
+                               
Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())),
+                               jobs -> !jobs.isEmpty(),
+                               TestingUtils.defaultScheduledExecutor())
+                       .get()
+                       .stream()
+                       .map(JobStatusMessage::getJobId)
+                       .collect(Collectors.toList());
        }
 
        private void printProcessLog(String processName, String log) {

Reply via email to