[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250224836 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws IOException { } /** -* Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). +* Starts a session cluster on YARN, and submits a streaming job. +* +* Tests +* +* if a custom YARN application name can be set from the command line, +* if the number of TaskManager slots can be set from the command line, +* if dynamic properties from the command line are set, +* if the vcores are set correctly (FLINK-2213), +* if jobmanager hostname/port are shown in web interface (FLINK-1902) +* +* +* Hint: If you think it is a good idea to add more assertions to this test, think again! */ - @Test(timeout = 10) // timeout after 100 seconds - public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", + @Test(timeout = 100_000) + public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception { + final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, - "Number of connected TaskManagers changed to 1. Slots available: 3", + "Flink JobManager is now running on ", RunTypes.YARN_SESSION); - Assert.assertEquals(2, getRunningContainers()); + final String logs = outContent.toString(); + final String hostname = parseJobManagerHostname(logs); + LOG.info("Extracted hostname: {}", hostname); - // Test if JobManager web interface is accessible --- + final ApplicationReport applicationReport = getOnlyApplicationReport(); + final String restApiBaseUrl = normalizeTrackingUrl(applicationReport.getTrackingUrl()); + LOG.info("Got application URL from YARN {}", restApiBaseUrl); - final YarnClient yc = YarnClient.createYarnClient(); - yc.init(YARN_CONFIGURATION); - yc.start(); + submitJob("WindowJoin.jar"); + waitForTaskManagerRegistration(restApiBaseUrl, Duration.ofMillis(30_000)); - List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if (!url.endsWith("/")) { - url += "/"; - } - if (!url.startsWith("http://;)) { - url = "http://; + url; - } - LOG.info("Got application URL from YARN {}", url); + // + // Assert that custom YARN application name "customName" is set + // + assertEquals("customName", applicationReport.getName()); - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + // + // Assert the number of TaskManager slots are set + // + assertNumberOfSlotsPerTask(restApiBaseUrl, 3); - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(3,
[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250225736 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ## @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws IOException { } /** -* Test TaskManager failure and also if the vcores are set correctly (see issue FLINK-2213). +* Starts a session cluster on YARN, and submits a streaming job. +* +* Tests +* +* if a custom YARN application name can be set from the command line, +* if the number of TaskManager slots can be set from the command line, +* if dynamic properties from the command line are set, +* if the vcores are set correctly (FLINK-2213), +* if jobmanager hostname/port are shown in web interface (FLINK-1902) +* +* +* Hint: If you think it is a good idea to add more assertions to this test, think again! */ - @Test(timeout = 10) // timeout after 100 seconds - public void testTaskManagerFailure() throws Exception { - assumeTrue("The new mode does not start TMs upfront.", !isNewMode); - LOG.info("Starting testTaskManagerFailure()"); - Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), - "-n", "1", + @Test(timeout = 100_000) + public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots() throws Exception { + final Runner yarnSessionClusterRunner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m", "-tm", "1024m", "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3", "-D" + YarnConfigOptions.VCORES.key() + "=2"}, - "Number of connected TaskManagers changed to 1. Slots available: 3", + "Flink JobManager is now running on ", RunTypes.YARN_SESSION); - Assert.assertEquals(2, getRunningContainers()); + final String logs = outContent.toString(); + final String hostname = parseJobManagerHostname(logs); + LOG.info("Extracted hostname: {}", hostname); - // Test if JobManager web interface is accessible --- + final ApplicationReport applicationReport = getOnlyApplicationReport(); + final String restApiBaseUrl = normalizeTrackingUrl(applicationReport.getTrackingUrl()); + LOG.info("Got application URL from YARN {}", restApiBaseUrl); - final YarnClient yc = YarnClient.createYarnClient(); - yc.init(YARN_CONFIGURATION); - yc.start(); + submitJob("WindowJoin.jar"); + waitForTaskManagerRegistration(restApiBaseUrl, Duration.ofMillis(30_000)); - List apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)); - Assert.assertEquals(1, apps.size()); // Only one running - ApplicationReport app = apps.get(0); - Assert.assertEquals("customName", app.getName()); - String url = app.getTrackingUrl(); - if (!url.endsWith("/")) { - url += "/"; - } - if (!url.startsWith("http://;)) { - url = "http://; + url; - } - LOG.info("Got application URL from YARN {}", url); + // + // Assert that custom YARN application name "customName" is set + // + assertEquals("customName", applicationReport.getName()); - String response = TestBaseUtils.getFromHTTP(url + "taskmanagers/"); + // + // Assert the number of TaskManager slots are set + // + assertNumberOfSlotsPerTask(restApiBaseUrl, 3); - JsonNode parsedTMs = new ObjectMapper().readTree(response); - ArrayNode taskManagers = (ArrayNode) parsedTMs.get("taskmanagers"); - Assert.assertNotNull(taskManagers); - Assert.assertEquals(1, taskManagers.size()); - Assert.assertEquals(3,
[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base URL: https://github.com/apache/flink/pull/7546#discussion_r250227506 ## File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ## @@ -124,24 +135,84 @@ public void testKillYarnSessionClusterEntrypoint() throws Exception { final ApplicationId id = restClusterClient.getClusterId(); - waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT); + waitUntilJobIsRunning(restClusterClient, jobId); killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint()); + waitForApplicationAttempt(id, 2); + waitUntilJobIsRunning(restClusterClient, jobId); + + killApplicationAndWait(id); + } + + @Test + public void testJobRecoversAfterKillingTaskManager() throws Exception { + final RestClusterClient restClusterClient = deploySessionCluster(setupYarnClusterDescriptor()); + final JobID jobId = submitJob(restClusterClient, createJobGraph()); + final ApplicationId id = restClusterClient.getClusterId(); + waitUntilJobIsRunning(restClusterClient, jobId); + + stopTaskManagerContainer(); + waitUntilJobIsRunning(restClusterClient, jobId); Review comment: Can it happen that the JobManager does not realize that a `TM` has died before we call `waitUntilJobIsRunning`? Is there an easy way to check whether we have restarted the job? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services