[ https://issues.apache.org/jira/browse/FLINK-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649932#comment-16649932 ]
ASF GitHub Bot commented on FLINK-10530: ---------------------------------------- tillrohrmann closed pull request #6827: [FLINK-10530][tests] Harden ProcessFailureCancelingITCase and AbstractTaskManagerProcessFailureRecovery URL: https://github.com/apache/flink/pull/6827 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 5d7f26bb886..83298aa78ec 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -18,18 +18,19 @@ package org.apache.flink.test.recovery; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HeartbeatManagerOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.util.BlobServerResource; -import org.apache.flink.util.NetUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; import org.apache.flink.util.TestLogger; import org.junit.Rule; @@ -42,6 +43,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; @@ -76,6 +79,9 @@ @Rule public final BlobServerResource blobServerResource = new BlobServerResource(); + @Rule + public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource(); + @Test public void testTaskManagerProcessFailure() throws Exception { @@ -89,18 +95,19 @@ public void testTaskManagerProcessFailure() throws Exception { File coordinateTempDir = null; - final int jobManagerPort = NetUtils.getAvailablePort(); - final int restPort = NetUtils.getAvailablePort(); - - Configuration jmConfig = new Configuration(); - jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); - jmConfig.setString(JobManagerOptions.ADDRESS, "localhost"); - jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); - jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L); - jmConfig.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L); - jmConfig.setInteger(RestOptions.PORT, restPort); - - try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(jmConfig)) { + Configuration config = new Configuration(); + config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 500L); + config.setLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath()); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); + config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); + + try (final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config)) { // check that we run this test only if the java command // is available on this machine String javaCommand = getJavaCommandPath(); @@ -119,21 +126,28 @@ public void testTaskManagerProcessFailure() throws Exception { clusterEntrypoint.startCluster(); + final Map<String, String> keyValues = config.toMap(); + final ArrayList<String> commands = new ArrayList<>((keyValues.size() << 1) + 8); + // the TaskManager java command - String[] command = new String[] { - javaCommand, - "-Dlog.level=DEBUG", - "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), - "-Xms80m", "-Xmx80m", - "-classpath", getCurrentClasspath(), - TaskExecutorProcessEntryPoint.class.getName(), - String.valueOf(jobManagerPort) - }; + commands.add(javaCommand); + commands.add("-Dlog.level=DEBUG"); + commands.add("-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath()); + commands.add("-Xms80m"); + commands.add("-Xmx80m"); + commands.add("-classpath"); + commands.add(getCurrentClasspath()); + commands.add(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName()); + + for (Map.Entry<String, String> keyValue: keyValues.entrySet()) { + commands.add("--" + keyValue.getKey()); + commands.add(keyValue.getValue()); + } // start the first two TaskManager processes - taskManagerProcess1 = new ProcessBuilder(command).start(); + taskManagerProcess1 = new ProcessBuilder(commands).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1); - taskManagerProcess2 = new ProcessBuilder(command).start(); + taskManagerProcess2 = new ProcessBuilder(commands).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2); // the program will set a marker file in each of its parallel tasks once they are ready, so that @@ -148,7 +162,7 @@ public void testTaskManagerProcessFailure() throws Exception { @Override public void run() { try { - testTaskManagerFailure(restPort, coordinateDirClosure); + testTaskManagerFailure(config, coordinateDirClosure); } catch (Throwable t) { t.printStackTrace(); @@ -176,7 +190,7 @@ public void run() { } // start the third TaskManager - taskManagerProcess3 = new ProcessBuilder(command).start(); + taskManagerProcess3 = new ProcessBuilder(commands).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3); // kill one of the previous TaskManagers, triggering a failure and recovery @@ -232,11 +246,11 @@ public void run() { * The test program should be implemented here in a form of a separate thread. * This provides a solution for checking that it has been terminated. * - * @param jobManagerPort The port for submitting the topology to the local cluster + * @param configuration the config to use * @param coordinateDir TaskManager failure will be triggered only after processes * have successfully created file under this directory */ - public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception; + public abstract void testTaskManagerFailure(Configuration configuration, File coordinateDir) throws Exception; protected static void printProcessLog(String processName, String log) { if (log == null || log.length() == 0) { @@ -306,15 +320,8 @@ protected static boolean waitForMarkerFiles(File basedir, String prefix, int num public static void main(String[] args) { try { - int jobManagerPort = Integer.parseInt(args[0]); - - Configuration cfg = new Configuration(); - cfg.setString(JobManagerOptions.ADDRESS, "localhost"); - cfg.setInteger(JobManagerOptions.PORT, jobManagerPort); - cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); - cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); - cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); - cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + Configuration cfg = parameterTool.getConfiguration(); TaskManagerRunner.runTaskManager(cfg, ResourceID.generate()); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index afca8f12100..2e39bafe67f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -30,10 +30,12 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; @@ -53,16 +55,20 @@ import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.CheckedSupplier; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.StringWriter; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -84,6 +90,12 @@ @Rule public final BlobServerResource blobServerResource = new BlobServerResource(); + @Rule + public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource(); + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void testCancelingOnProcessFailure() throws Exception { final StringWriter processOutput = new StringWriter(); @@ -93,23 +105,30 @@ public void testCancelingOnProcessFailure() throws Exception { Process taskManagerProcess = null; final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - Configuration jmConfig = new Configuration(); - jmConfig.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); - jmConfig.setString(JobManagerOptions.ADDRESS, "localhost"); - jmConfig.setInteger(RestOptions.PORT, 0); - - final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, jmConfig); + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + config.setString(AkkaOptions.ASK_TIMEOUT, "100 s"); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath()); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); + config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); + + final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config); final int jobManagerPort = rpcService.getPort(); - jmConfig.setInteger(JobManagerOptions.PORT, jobManagerPort); + config.setInteger(JobManagerOptions.PORT, jobManagerPort); final SessionDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = new SessionDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE); DispatcherResourceManagerComponent<?> dispatcherResourceManagerComponent = null; - try (final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - jmConfig, - TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION)) { + final HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, + TestingUtils.defaultExecutor(), + HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); + + try { // check that we run this test only if the java command // is available on this machine @@ -125,7 +144,7 @@ public void testCancelingOnProcessFailure() throws Exception { CommonTestUtils.printLog4jDebugConfig(tempLogFile); dispatcherResourceManagerComponent = resourceManagerComponentFactory.create( - jmConfig, + config, rpcService, haServices, blobServerResource.getBlobServer(), @@ -134,26 +153,26 @@ public void testCancelingOnProcessFailure() throws Exception { new MemoryArchivedExecutionGraphStore(), fatalErrorHandler); - // update the rest ports - final int restPort = dispatcherResourceManagerComponent - .getWebMonitorEndpoint() - .getServerAddress() - .getPort(); - jmConfig.setInteger(RestOptions.PORT, restPort); + final Map<String, String> keyValues = config.toMap(); + final ArrayList<String> commands = new ArrayList<>((keyValues.size() << 1) + 8); // the TaskManager java command - String[] command = new String[] { - javaCommand, - "-Dlog.level=DEBUG", - "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), - "-Xms80m", "-Xmx80m", - "-classpath", getCurrentClasspath(), - AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName(), - String.valueOf(jobManagerPort) - }; + commands.add(javaCommand); + commands.add("-Dlog.level=DEBUG"); + commands.add("-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath()); + commands.add("-Xms80m"); + commands.add("-Xmx80m"); + commands.add("-classpath"); + commands.add(getCurrentClasspath()); + commands.add(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName()); + + for (Map.Entry<String, String> keyValue: keyValues.entrySet()) { + commands.add("--" + keyValue.getKey()); + commands.add(keyValue.getValue()); + } // start the first two TaskManager processes - taskManagerProcess = new ProcessBuilder(command).start(); + taskManagerProcess = new ProcessBuilder(commands).start(); new CommonTestUtils.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput); final Throwable[] errorRef = new Throwable[1]; @@ -163,7 +182,7 @@ public void testCancelingOnProcessFailure() throws Exception { @Override public void run() { try { - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", restPort, new Configuration()); + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, config); env.setParallelism(2); env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); @@ -196,16 +215,10 @@ public Long map(Long value) throws Exception { // kill the TaskManager programThread.start(); - final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L)); - - final DispatcherGateway dispatcherGateway = rpcService.connect( - leaderConnectionInfo.getAddress(), - DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), - DispatcherGateway.class).get(); - + final DispatcherGateway dispatcherGateway = retrieveDispatcherGateway(rpcService, haServices); waitUntilAllSlotsAreUsed(dispatcherGateway, timeout); - clusterClient = new RestClusterClient<>(jmConfig, "standalone"); + clusterClient = new RestClusterClient<>(config, "standalone"); final Collection<JobID> jobIds = waitForRunningJobs(clusterClient, timeout); @@ -252,12 +265,31 @@ public Long map(Long value) throws Exception { dispatcherResourceManagerComponent.close(); } + haServices.closeAndCleanupAllData(); + fatalErrorHandler.rethrowError(); RpcUtils.terminateRpcService(rpcService, Time.seconds(10L)); } } + /** + * Helper method to wait until the {@link Dispatcher} has set its fencing token. + * + * @param rpcService to use to connect to the dispatcher + * @param haServices high availability services to connect to the dispatcher + * @return {@link DispatcherGateway} + * @throws Exception if something goes wrong + */ + static DispatcherGateway retrieveDispatcherGateway(RpcService rpcService, HighAvailabilityServices haServices) throws Exception { + final LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(), Time.seconds(10L)); + + return rpcService.connect( + leaderConnectionInfo.getAddress(), + DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()), + DispatcherGateway.class).get(); + } + private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException { FutureUtils.retrySuccesfulWithDelay( () -> dispatcherGateway.requestClusterOverview(timeout), diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java index 4815c4938f7..473fb3959f8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.java @@ -64,10 +64,9 @@ public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) // -------------------------------------------------------------------------------------------- @Override - public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { + public void testTaskManagerFailure(Configuration configuration, final File coordinateDir) throws Exception { - final Configuration configuration = new Configuration(); - ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort, configuration); + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration); env.setParallelism(PARALLELISM); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); env.getConfig().setExecutionMode(executionMode); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java index fbf6b5b71e3..8ccb311be1c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerProcessFailureStreamingRecoveryITCase.java @@ -61,14 +61,13 @@ private static final int DATA_COUNT = 10000; @Override - public void testTaskManagerFailure(int jobManagerPort, final File coordinateDir) throws Exception { + public void testTaskManagerFailure(Configuration configuration, final File coordinateDir) throws Exception { final File tempCheckpointDir = tempFolder.newFolder(); - final Configuration configuration = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( "localhost", - jobManagerPort, + 1337, // not needed since we use ZooKeeper configuration); env.setParallelism(PARALLELISM); env.getConfig().disableSysoutLogging(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ProcessFailureCancelingITCase.testCancelingOnProcessFailure failed on Travis. > ----------------------------------------------------------------------------- > > Key: FLINK-10530 > URL: https://issues.apache.org/jira/browse/FLINK-10530 > Project: Flink > Issue Type: Bug > Components: Tests > Affects Versions: 1.7.0 > Reporter: Kostas Kloudas > Assignee: Till Rohrmann > Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.7.0 > > > The logs from Travis: https://api.travis-ci.org/v3/job/440109944/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)