tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250225736
########## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ########## @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws IOException { } /** - * Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). + * Starts a session cluster on YARN, and submits a streaming job. + * + * <p>Tests + * <ul> + * <li>if a custom YARN application name can be set from the command line, + * <li>if the number of TaskManager slots can be set from the command line, + * <li>if dynamic properties from the command line are set, + * <li>if the vcores are set correctly (FLINK-2213), + * <li>if jobmanager hostname/port are shown in web interface (FLINK-1902) + * </ul> + * + * <p><b>Hint: </b> If you think it is a good idea to add more assertions to this test, think again! */ - @Test(timeout = 100000) // timeout after 100 seconds - public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", + @Test(timeout = 100_000) + public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception { + final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, - "Number of connected TaskManagers changed to 1. Slots available: 3", + "Flink JobManager is now running on ", RunTypes.YARN_SESSION); - Assert.assertEquals(2, getRunningContainers()); + final String logs = outContent.toString(); + final String hostname = parseJobManagerHostname(logs); + LOG.info("Extracted hostname: {}", hostname); - // ------------------------ Test if JobManager web interface is accessible ------- + final ApplicationReport applicationReport = getOnlyApplicationReport(); + final String restApiBaseUrl = normalizeTrackingUrl(applicationReport.getTrackingUrl()); + LOG.info("Got application URL from YARN {}", restApiBaseUrl); - final YarnClient yc = YarnClient.createYarnClient(); - yc.init(YARN_CONFIGURATION); - yc.start(); + submitJob("WindowJoin.jar"); + waitForTaskManagerRegistration(restApiBaseUrl, Duration.ofMillis(30_000)); - List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if (!url.endsWith("/")) { - url += "/"; - } - if (!url.startsWith("http://")) { - url = "http://" + url; - } - LOG.info("Got application URL from YARN {}", url); + // + // Assert that custom YARN application name "customName" is set + // + assertEquals("customName", applicationReport.getName()); - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + // + // Assert the number of TaskManager slots are set + // + assertNumberOfSlotsPerTask(restApiBaseUrl, 3); - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(3, taskManagers.get(0).get("slotsNumber").asInt()); + final Map<String, String> flinkConfig = getFlinkConfig(restApiBaseUrl); - // get the configuration from webinterface & check if the dynamic properties from YARN show up there. - String jsonConfig = TestBaseUtils.getFromHTTP(url + "jobmanager/config"); - Map<String, String> parsedConfig = WebMonitorUtils.fromKeyValueJsonArray(jsonConfig); + // + // Assert dynamic properties + // + assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy")); + assertThat(flinkConfig, hasEntry("yarn.maximum-failed-containers", "3")); + + // + // FLINK-2213: assert that vcores are set + // + assertThat(flinkConfig, hasEntry(YarnConfigOptions.VCORES.key(), "2")); - Assert.assertEquals("veryFancy", parsedConfig.get("fancy-configuration-value")); - Assert.assertEquals("3", parsedConfig.get("yarn.maximum-failed-containers")); - Assert.assertEquals("2", parsedConfig.get(YarnConfigOptions.VCORES.key())); + // + // FLINK-1902: check if jobmanager hostname is shown in web interface + // + assertThat(flinkConfig, hasEntry(JobManagerOptions.ADDRESS.key(), hostname)); - // -------------- FLINK-1902: check if jobmanager hostname/port are shown in web interface - // first, get the hostname/port - String oC = outContent.toString(); - Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)"); - Matcher matches = p.matcher(oC); + yarnSessionClusterRunner.sendStop(); + yarnSessionClusterRunner.join(); + } + + private static String parseJobManagerHostname(final String logs) { + final Pattern p = Pattern.compile("Flink JobManager is now running on ([a-zA-Z0-9.-]+):([0-9]+)"); + final Matcher matches = p.matcher(logs); String hostname = null; - String port = null; + while (matches.find()) { hostname = matches.group(1).toLowerCase(); - port = matches.group(2); } - LOG.info("Extracted hostname:port: {} {}", hostname, port); - - Assert.assertEquals("unable to find hostname in " + jsonConfig, hostname, - parsedConfig.get(JobManagerOptions.ADDRESS.key())); - Assert.assertEquals("unable to find port in " + jsonConfig, port, - parsedConfig.get(JobManagerOptions.PORT.key())); - - // test logfile access - String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log"); - Assert.assertTrue(logs.contains("Starting YARN ApplicationMaster")); - Assert.assertTrue(logs.contains("Starting JobManager")); - Assert.assertTrue(logs.contains("Starting JobManager Web Frontend")); - - // ------------------------ Kill container with TaskManager and check if vcores are set correctly ------- - - // find container id of taskManager: - ContainerId taskManagerContainer = null; - NodeManager nodeManager = null; - UserGroupInformation remoteUgi = null; - NMTokenIdentifier nmIdent = null; - try { - remoteUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - LOG.warn("Unable to get curr user", e); - Assert.fail(); - } - for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { - NodeManager nm = yarnCluster.getNodeManager(nmId); - ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); - for (Map.Entry<ContainerId, Container> entry : containers.entrySet()) { - String command = StringUtils.join(entry.getValue().getLaunchContext().getCommands(), " "); - if (command.contains(YarnTaskManager.class.getSimpleName())) { - taskManagerContainer = entry.getKey(); - nodeManager = nm; - nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0); - // allow myself to do stuff with the container - // remoteUgi.addCredentials(entry.getValue().getCredentials()); - remoteUgi.addTokenIdentifier(nmIdent); - } - } - sleep(500); - } - - Assert.assertNotNull("Unable to find container with TaskManager", taskManagerContainer); - Assert.assertNotNull("Illegal state", nodeManager); - yc.stop(); + assertNotNull("hostname not found in log", hostname); - List<ContainerId> toStop = new LinkedList<ContainerId>(); - toStop.add(taskManagerContainer); - StopContainersRequest scr = StopContainersRequest.newInstance(toStop); + return hostname; + } - try { - nodeManager.getNMContext().getContainerManager().stopContainers(scr); - } catch (Throwable e) { - LOG.warn("Error stopping container", e); - Assert.fail("Error stopping container: " + e.getMessage()); - } + private ApplicationReport getOnlyApplicationReport() throws IOException, YarnException { + final YarnClient yarnClient = getYarnClient(); + checkState(yarnClient != null); - // stateful termination check: - // wait until we saw a container being killed and AFTERWARDS a new one launched - boolean ok = false; - do { - LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString()); - - String o = errContent.toString(); - int killedOff = o.indexOf("Container killed by the ApplicationMaster"); - if (killedOff != -1) { - o = o.substring(killedOff); - ok = o.indexOf("Launching TaskManager") > 0; - } - sleep(1000); - } while(!ok); + final List<ApplicationReport> apps = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); + assertEquals(1, apps.size()); // Only one running + return apps.get(0); + } - // send "stop" command to command line interface - runner.sendStop(); - // wait for the thread to stop - try { - runner.join(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while stopping runner", e); + private static String normalizeTrackingUrl(final String trackingUrl) { + String url = trackingUrl; + if (!url.endsWith("/")) { + url += "/"; + } + if (!url.startsWith("http://")) { + url = "http://" + url; } - LOG.warn("stopped"); + return url; + } - // ----------- Send output to logger - System.setOut(ORIGINAL_STDOUT); - System.setErr(ORIGINAL_STDERR); - oC = outContent.toString(); - String eC = errContent.toString(); - LOG.info("Sending stdout content through logger: \n\n{}\n\n", oC); - LOG.info("Sending stderr content through logger: \n\n{}\n\n", eC); + private void submitJob(final String jobFileName) throws IOException, InterruptedException { + Runner jobRunner = startWithArgs(new String[]{"run", + "--detached", getTestJarPath(jobFileName).getAbsolutePath()}, + "Job has been submitted with JobID", RunTypes.CLI_FRONTEND); + jobRunner.join(); + } - // ------ Check if everything happened correctly - Assert.assertTrue("Expect to see failed container", - eC.contains("New messages from the YARN cluster")); + private static void waitForTaskManagerRegistration(final String url, final Duration waitDuration) throws Exception { + waitUntilCondition(() -> getNumberOfTaskManagers(url) > 0, Deadline.fromNow(waitDuration)); + } - Assert.assertTrue("Expect to see failed container", - eC.contains("Container killed by the ApplicationMaster")); + private static void assertNumberOfSlotsPerTask(final String url, final int slotsNumber) throws Exception { + try { + waitUntilCondition(() -> getNumberOfSlotsPerTaskManager(url) == slotsNumber, Deadline.fromNow(Duration.ofSeconds(30))); + } catch (final TimeoutException e) { + final int currentNumberOfSlots = getNumberOfSlotsPerTaskManager(url); + fail(String.format("Expected slots per TM to be %d, was: %d", slotsNumber, currentNumberOfSlots)); + } + } - Assert.assertTrue("Expect to see new container started", - eC.contains("Launching TaskManager") && eC.contains("on host")); + private static int getNumberOfTaskManagers(final String url) throws Exception { + String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + JsonNode parsedTMs = new ObjectMapper().readTree(response); + ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); + return taskManagers == null ? 0 : taskManagers.size(); + } - // cleanup auth for the subsequent tests. - remoteUgi.getTokenIdentifiers().remove(nmIdent); + private static int getNumberOfSlotsPerTaskManager(final String url) throws Exception { Review comment: Here we could also use the `ClusterOverview` and simply divide the number of total slots by the number of TMs. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services