This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6ace721e5515a8fcc085e501bcfc8586551d1d36 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Sep 24 08:56:58 2018 +0200 [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base This closes #6751. --- .../flink/runtime/testutils/DispatcherProcess.java | 179 +++++++++++++++++++++ ...ManagerHAProcessFailureBatchRecoveryITCase.java | 144 +++++++++-------- 2 files changed, 253 insertions(+), 70 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java new file mode 100644 index 0000000..79b0dc3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/DispatcherProcess.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; +import org.apache.flink.runtime.jobmanager.JobManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link Dispatcher} instance running in a separate JVM. + */ +public class DispatcherProcess extends TestJvmProcess { + + private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class); + + /** Pattern to parse the job manager port from the logs. */ + private static final Pattern PORT_PATTERN = Pattern.compile(".*Actor system started at akka\\.tcp://flink@.*:(\\d+).*"); + + /** ID for this JobManager. */ + private final int id; + + /** The configuration for the JobManager. */ + private final Configuration config; + + /** Configuration parsed as args for {@link JobManagerProcess.JobManagerProcessEntryPoint}. */ + private final String[] jvmArgs; + + /** The port the JobManager listens on. */ + private int jobManagerPort; + + /** + * Creates a {@link JobManager} running in a separate JVM. + * + * @param id ID for the JobManager + * @param config Configuration for the job manager process + * + * @throws Exception + */ + public DispatcherProcess(int id, Configuration config) throws Exception { + checkArgument(id >= 0, "Negative ID"); + this.id = id; + this.config = checkNotNull(config, "Configuration"); + + ArrayList<String> args = new ArrayList<>(); + + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + args.add("--" + entry.getKey()); + args.add(entry.getValue()); + } + + this.jvmArgs = new String[args.size()]; + args.toArray(jvmArgs); + } + + @Override + public String getName() { + return "JobManager " + id; + } + + @Override + public String[] getJvmArgs() { + return jvmArgs; + } + + @Override + public String getEntryPointClassName() { + return DispatcherProcessEntryPoint.class.getName(); + } + + public Configuration getConfig() { + return config; + } + + /** + * Parses the port from the job manager logs and returns it. + * + * <p>If a call to this method succeeds, successive calls will directly + * return the port and re-parse the logs. + * + * @param timeout Timeout for log parsing. + * @return The port of the job manager + * @throws InterruptedException If interrupted while waiting before + * retrying to parse the logs + * @throws NumberFormatException If the parsed port is not a number + */ + public int getJobManagerPort(FiniteDuration timeout) throws InterruptedException, NumberFormatException { + if (jobManagerPort > 0) { + return jobManagerPort; + } else { + Deadline deadline = timeout.fromNow(); + while (deadline.hasTimeLeft()) { + Matcher matcher = PORT_PATTERN.matcher(getProcessOutput()); + if (matcher.find()) { + String port = matcher.group(1); + jobManagerPort = Integer.parseInt(port); + return jobManagerPort; + } else { + Thread.sleep(100); + } + } + + throw new RuntimeException("Could not parse port from logs"); + } + } + + @Override + public String toString() { + return String.format("JobManagerProcess(id=%d, port=%d)", id, jobManagerPort); + } + + /** + * Entry point for the JobManager process. + */ + public static class DispatcherProcessEntryPoint { + + private static final Logger LOG = LoggerFactory.getLogger(DispatcherProcessEntryPoint.class); + + /** + * Entrypoint of the DispatcherProcessEntryPoint. + * + * <p>Other arguments are parsed to a {@link Configuration} and passed to the + * JobManager, for instance: <code>--high-availability ZOOKEEPER --high-availability.zookeeper.quorum + * "xyz:123:456"</code>. + */ + public static void main(String[] args) { + try { + ParameterTool params = ParameterTool.fromArgs(args); + Configuration config = params.getConfiguration(); + LOG.info("Configuration: {}.", config); + + config.setInteger(JobManagerOptions.PORT, 0); + config.setInteger(RestOptions.PORT, 0); + + final StandaloneSessionClusterEntrypoint clusterEntrypoint = new StandaloneSessionClusterEntrypoint(config); + + ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint); + } + catch (Throwable t) { + LOG.error("Failed to start JobManager process", t); + System.exit(1); + } + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index d3accff..9e9ce07 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -22,36 +22,38 @@ import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.metrics.NoOpMetricRegistry; -import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; -import org.apache.flink.runtime.testutils.JobManagerProcess; +import org.apache.flink.runtime.testutils.DispatcherProcess; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; import org.apache.commons.io.FileUtils; import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -59,12 +61,15 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import scala.Option; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; @@ -90,23 +95,28 @@ import static org.junit.Assert.fail; @RunWith(Parameterized.class) public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { - private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + private static ZooKeeperTestEnvironment zooKeeper; private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES); @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - @AfterClass - public static void tearDown() throws Exception { - if (ZooKeeper != null) { - ZooKeeper.shutdown(); - } + @BeforeClass + public static void setup() { + zooKeeper = new ZooKeeperTestEnvironment(1); } @Before public void cleanUp() throws Exception { - ZooKeeper.deleteAll(); + zooKeeper.deleteAll(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (zooKeeper != null) { + zooKeeper.shutdown(); + } } protected static final String READY_MARKER_FILE_PREFIX = "ready_"; @@ -141,7 +151,6 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { */ private void testJobManagerFailure(String zkQuorum, final File coordinateDir, final File zookeeperStoragePath) throws Exception { Configuration config = new Configuration(); - config.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, zookeeperStoragePath.getAbsolutePath()); @@ -149,7 +158,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "leader", 1, config); env.setParallelism(PARALLELISM); - env.setNumberOfExecutionRetries(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L)); env.getConfig().setExecutionMode(executionMode); env.getConfig().disableSysoutLogging(); @@ -212,7 +221,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { } @Test - public void testJobManagerProcessFailure() throws Exception { + public void testDispatcherProcessFailure() throws Exception { + final Time timeout = Time.seconds(30L); final File zookeeperStoragePath = temporaryFolder.newFolder(); // Config @@ -222,15 +232,11 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { assertEquals(PARALLELISM, numberOfTaskManagers * numberOfSlotsPerTaskManager); - // Setup - // Test actor system - ActorSystem testActorSystem; - // Job managers - final JobManagerProcess[] jmProcess = new JobManagerProcess[numberOfJobManagers]; + final DispatcherProcess[] dispatcherProcesses = new DispatcherProcess[numberOfJobManagers]; // Task managers - final ActorSystem[] tmActorSystem = new ActorSystem[numberOfTaskManagers]; + TaskManagerRunner[] taskManagerRunners = new TaskManagerRunner[numberOfTaskManagers]; HighAvailabilityServices highAvailabilityServices = null; @@ -239,24 +245,25 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { // Coordination between the processes goes through a directory File coordinateTempDir = null; + // Cluster config + Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( + zooKeeper.getConnectString(), zookeeperStoragePath.getPath()); + // Task manager configuration + config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); + + final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config); + try { final Deadline deadline = TestTimeOut.fromNow(); // Coordination directory coordinateTempDir = temporaryFolder.newFolder(); - // Job Managers - Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( - ZooKeeper.getConnectString(), zookeeperStoragePath.getPath()); - // Start first process - jmProcess[0] = new JobManagerProcess(0, config); - jmProcess[0].startProcess(); - - // Task manager configuration - config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); - config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); + dispatcherProcesses[0] = new DispatcherProcess(0, config); + dispatcherProcesses[0].startProcess(); highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( config, @@ -264,27 +271,13 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { // Start the task manager process for (int i = 0; i < numberOfTaskManagers; i++) { - tmActorSystem[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); - TaskManager.startTaskManagerComponentsAndActor( - config, - ResourceID.generate(), - tmActorSystem[i], - highAvailabilityServices, - NoOpMetricRegistry.INSTANCE, - "localhost", - Option.<String>empty(), - false, - TaskManager.class); + taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate()); + taskManagerRunners[i].start(); } - // Test actor system - testActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); - - jmProcess[0].getActorRef(testActorSystem, deadline.timeLeft()); - // Leader listener TestingListener leaderListener = new TestingListener(); - leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); + leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); leaderRetrievalService.start(leaderListener); // Initial submission @@ -293,13 +286,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { String leaderAddress = leaderListener.getAddress(); UUID leaderId = leaderListener.getLeaderSessionID(); - // Get the leader ref - ActorRef leaderRef = AkkaUtils.getActorRef(leaderAddress, testActorSystem, deadline.timeLeft()); - ActorGateway leaderGateway = new AkkaActorGateway(leaderRef, leaderId); + final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = rpcService.connect( + leaderAddress, + DispatcherId.fromUuid(leaderId), + DispatcherGateway.class); + final DispatcherGateway dispatcherGateway = dispatcherGatewayFuture.get(); // Wait for all task managers to connect to the leading job manager - JobManagerActorTestUtils.waitForTaskManagers(numberOfTaskManagers, leaderGateway, - deadline.timeLeft()); + waitForTaskManagers(numberOfTaskManagers, dispatcherGateway, deadline.timeLeft()); final File coordinateDirClosure = coordinateTempDir; final Throwable[] errorRef = new Throwable[1]; @@ -309,7 +303,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { @Override public void run() { try { - testJobManagerFailure(ZooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath); + testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, zookeeperStoragePath); } catch (Throwable t) { t.printStackTrace(); @@ -326,12 +320,10 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { READY_MARKER_FILE_PREFIX, PARALLELISM, deadline.timeLeft().toMillis()); // Kill one of the job managers and trigger recovery - jmProcess[0].destroy(); + dispatcherProcesses[0].destroy(); - jmProcess[1] = new JobManagerProcess(1, config); - jmProcess[1].startProcess(); - - jmProcess[1].getActorRef(testActorSystem, deadline.timeLeft()); + dispatcherProcesses[1] = new DispatcherProcess(1, config); + dispatcherProcesses[1].startProcess(); // we create the marker file which signals the program functions tasks that they can complete AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE)); @@ -358,7 +350,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { // for Travis and the root problem is not shown) t.printStackTrace(); - for (JobManagerProcess p : jmProcess) { + for (DispatcherProcess p : dispatcherProcesses) { if (p != null) { p.printProcessLog(); } @@ -368,8 +360,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { } finally { for (int i = 0; i < numberOfTaskManagers; i++) { - if (tmActorSystem[i] != null) { - tmActorSystem[i].shutdown(); + if (taskManagerRunners[i] != null) { + taskManagerRunners[i].close(); } } @@ -377,7 +369,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { leaderRetrievalService.stop(); } - for (JobManagerProcess jmProces : jmProcess) { + for (DispatcherProcess jmProces : dispatcherProcesses) { if (jmProces != null) { jmProces.destroy(); } @@ -387,6 +379,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { highAvailabilityServices.closeAndCleanupAllData(); } + RpcUtils.terminateRpcService(rpcService, timeout); + // Delete coordination directory if (coordinateTempDir != null) { try { @@ -398,4 +392,14 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { } } + private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, FiniteDuration timeLeft) throws ExecutionException, InterruptedException { + FutureUtils.retrySuccesfulWithDelay( + () -> dispatcherGateway.requestClusterOverview(Time.milliseconds(timeLeft.toMillis())), + Time.milliseconds(50L), + org.apache.flink.api.common.time.Deadline.fromNow(Duration.ofMillis(timeLeft.toMillis())), + clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers, + new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor())) + .get(); + } + }
