This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff97d1cb3efbac4f47abc0a38bac932a55fc1288 Author: Till Rohrmann <[email protected]> AuthorDate: Sun Sep 23 00:28:30 2018 +0200 [FLINK-10401] Port ProcessFailureCancelingITCase to new code base This closes #6749. --- .../DispatcherResourceManagerComponent.java | 10 + .../flink/runtime/util/BlobServerResource.java | 4 + ...tractTaskManagerProcessFailureRecoveryTest.java | 29 +++ .../recovery/ProcessFailureCancelingITCase.java | 256 ++++++++++----------- 4 files changed, 159 insertions(+), 140 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java index 94925b2..b07095c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java @@ -116,6 +116,16 @@ public class DispatcherResourceManagerComponent<T extends Dispatcher> implements return shutDownFuture; } + @Nonnull + public T getDispatcher() { + return dispatcher; + } + + @Nonnull + public WebMonitorEndpoint<?> getWebMonitorEndpoint() { + return webMonitorEndpoint; + } + @Override public CompletableFuture<Void> closeAsync() { if (isRunning.compareAndSet(true, false)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java index 080ecf8..654b2bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlobServerResource.java @@ -62,4 +62,8 @@ public class BlobServerResource extends ExternalResource { public int getBlobServerPort() { return blobServer.getPort(); } + + public BlobServer getBlobServer() { + return blobServer; + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 56327ad..5cd6f30 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -421,4 +422,32 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test } } + /** + * The entry point for the TaskExecutor JVM. Simply configures and runs a TaskExecutor. + */ + public static class TaskExecutorProcessEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessEntryPoint.class); + + public static void main(String[] args) { + try { + int jobManagerPort = Integer.parseInt(args[0]); + + Configuration cfg = new Configuration(); + cfg.setString(JobManagerOptions.ADDRESS, "localhost"); + cfg.setInteger(JobManagerOptions.PORT, jobManagerPort); + cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); + cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); + cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); + cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + + TaskManagerRunner.runTaskManager(cfg, ResourceID.generate()); + } + catch (Throwable t) { + LOG.error("Failed to start TaskManager process", t); + System.exit(1); + } + } + } + } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index b85a410..afca8f1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -21,51 +21,58 @@ package org.apache.flink.test.recovery; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent; +import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.jobmanager.JobManager; -import org.apache.flink.runtime.jobmanager.MemoryArchivist; -import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.util.NetUtils; +import org.apache.flink.runtime.util.BlobServerResource; +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.CheckedSupplier; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; +import org.junit.Rule; import org.junit.Test; import java.io.File; import java.io.StringWriter; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * This test makes sure that jobs are canceled properly in cases where @@ -74,15 +81,36 @@ import static org.junit.Assert.fail; @SuppressWarnings("serial") public class ProcessFailureCancelingITCase extends TestLogger { + @Rule + public final BlobServerResource blobServerResource = new BlobServerResource(); + @Test public void testCancelingOnProcessFailure() throws Exception { final StringWriter processOutput = new StringWriter(); + final Time timeout = Time.minutes(2L); - ActorSystem jmActorSystem = null; + RestClusterClient<String> clusterClient = null; Process taskManagerProcess = null; - HighAvailabilityServices highAvailabilityServices = null; + final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); + + Configuration jmConfig = new Configuration(); + jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + jmConfig.setString(JobManagerOptions.ADDRESS, "localhost"); + jmConfig.setInteger(RestOptions.PORT, 0); + + final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig); + final int jobManagerPort = rpcService.getPort(); + jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); + + final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory( + StandaloneResourceManagerFactory.INSTANCE); + DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null; + + try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( + jmConfig, + TestingUtils.defaultExecutor(), + HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) { - try { // check that we run this test only if the java command // is available on this machine String javaCommand = getJavaCommandPath(); @@ -96,36 +124,22 @@ public class ProcessFailureCancelingITCase extends TestLogger { tempLogFile.deleteOnExit(); CommonTestUtils.printLog4jDebugConfig(tempLogFile); - // find a free port to start the JobManager - final int jobManagerPort = NetUtils.getAvailablePort(); - - // start a JobManager - Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort); - - Configuration jmConfig = new Configuration(); - jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "5 s"); - jmConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2000 s"); - jmConfig.setInteger(AkkaOptions.WATCH_THRESHOLD, 10); - jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); - jmConfig.setString(JobManagerOptions.ADDRESS, localAddress._1()); - jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); - - highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - jmConfig, - TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<>(localAddress)); - ActorRef jmActor = JobManager.startJobManagerActors( + dispatcherResourceManagerComponent = resourceManagerComponentFactory.create( jmConfig, - jmActorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - highAvailabilityServices, + rpcService, + haServices, + blobServerResource.getBlobServer(), + new HeartbeatServices(100L, 1000L), NoOpMetricRegistry.INSTANCE, - Option.empty(), - JobManager.class, - MemoryArchivist.class)._1(); + new MemoryArchivedExecutionGraphStore(), + fatalErrorHandler); + + // update the rest ports + final int restPort = dispatcherResourceManagerComponent + .getWebMonitorEndpoint() + .getServerAddress() + .getPort(); + jmConfig.setInteger(RestOptions.PORT, restPort); // the TaskManager java command String[] command = new String[] { @@ -134,7 +148,7 @@ public class ProcessFailureCancelingITCase extends TestLogger { "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms80m", "-Xmx80m", "-classpath", getCurrentClasspath(), - AbstractTaskManagerProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(), + AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(), String.valueOf(jobManagerPort) }; @@ -142,21 +156,14 @@ public class ProcessFailureCancelingITCase extends TestLogger { taskManagerProcess = new ProcessBuilder(command).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); - // we wait for the JobManager to have the two TaskManagers available - // since some of the CI environments are very hostile, we need to give this a lot of time (2 minutes) - waitUntilNumTaskManagersAreRegistered(jmActor, 1, 120000); - final Throwable[] errorRef = new Throwable[1]; - final Configuration configuration = new Configuration(); - configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); - // start the test program, which infinitely blocks Runnable programRunner = new Runnable() { @Override public void run() { try { - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration); + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration()); env.setParallelism(2); env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); @@ -187,15 +194,30 @@ public class ProcessFailureCancelingITCase extends TestLogger { Thread programThread = new Thread(programRunner); // kill the TaskManager + programThread.start(); + + final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L)); + + final DispatcherGateway dispatcherGateway = rpcService.connect( + leaderConnectionInfo.getAddress(), + DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), + DispatcherGateway.class).get(); + + waitUntilAllSlotsAreUsed(dispatcherGateway, timeout); + + clusterClient = new RestClusterClient<>(jmConfig, "standalone"); + + final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout); + + assertThat(jobIds, hasSize(1)); + final JobID jobId = jobIds.iterator().next(); + + // kill the TaskManager after the job started to run taskManagerProcess.destroy(); taskManagerProcess = null; - // immediately submit the job. this should hit the case - // where the JobManager still thinks it has the TaskManager and tries to send it tasks - programThread.start(); - // try to cancel the job - cancelRunningJob(jmActor); + clusterClient.cancel(jobId); // we should see a failure within reasonable time (10s is the ask timeout). // since the CI environment is often slow, we conservatively give it up to 2 minutes, @@ -223,88 +245,42 @@ public class ProcessFailureCancelingITCase extends TestLogger { if (taskManagerProcess != null) { taskManagerProcess.destroy(); } - if (jmActorSystem != null) { - jmActorSystem.shutdown(); - } - - if (highAvailabilityServices != null) { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - } - - private void cancelRunningJob(ActorRef jobManager) throws Exception { - final FiniteDuration askTimeout = new FiniteDuration(10, TimeUnit.SECONDS); - - // try at most for 30 seconds - final long deadline = System.currentTimeMillis() + 30000; - - JobID jobId = null; - - do { - Future<Object> response = Patterns.ask(jobManager, - JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout)); - - Object result; - try { - result = Await.result(response, askTimeout); + if (clusterClient != null) { + clusterClient.shutdown(); } - catch (Exception e) { - throw new Exception("Could not retrieve running jobs from the JobManager.", e); + if (dispatcherResourceManagerComponent != null) { + dispatcherResourceManagerComponent.close(); } - if (result instanceof JobManagerMessages.RunningJobsStatus) { - - List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + fatalErrorHandler.rethrowError(); - if (jobs.size() == 1) { - jobId = jobs.get(0).getJobId(); - break; - } - } - } - while (System.currentTimeMillis() < deadline); - - if (jobId == null) { - // we never found it running, must have failed already - return; + RpcUtils.terminateRpcService(rpcService, Time.seconds(10L)); } - - // tell the JobManager to cancel the job - jobManager.tell( - new JobManagerMessages.LeaderSessionMessage( - HighAvailabilityServices.DEFAULT_LEADER_ID, - new JobManagerMessages.CancelJob(jobId)), - ActorRef.noSender()); } - private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) - throws Exception { - final long deadline = System.currentTimeMillis() + maxDelay; - while (true) { - long remaining = deadline - System.currentTimeMillis(); - if (remaining <= 0) { - fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"); - } - - FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS); + private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException { + FutureUtils.retrySuccesfulWithDelay( + () -> dispatcherGateway.requestClusterOverview(timeout), + Time.milliseconds(50L), + Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())), + clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 && + clusterOverview.getNumSlotsAvailable() == 0 && + clusterOverview.getNumSlotsTotal() == 2, + TestingUtils.defaultScheduledExecutor()) + .get(); + } - try { - Future<?> result = Patterns.ask(jobManager, - JobManagerMessages.getRequestNumberRegisteredTaskManager(), - new Timeout(timeout)); - Integer numTMs = (Integer) Await.result(result, timeout); - if (numTMs == numExpected) { - break; - } - } - catch (TimeoutException e) { - // ignore and retry - } - catch (ClassCastException e) { - fail("Wrong response: " + e.getMessage()); - } - } + private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException { + return FutureUtils.retrySuccesfulWithDelay( + CheckedSupplier.unchecked(clusterClient::listJobs), + Time.milliseconds(50L), + Deadline.fromNow(Duration.ofMillis(timeout.toMilliseconds())), + jobs -> !jobs.isEmpty(), + TestingUtils.defaultScheduledExecutor()) + .get() + .stream() + .map(JobStatusMessage::getJobId) + .collect(Collectors.toList()); } private void printProcessLog(String processName, String log) {
