tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250225736
 
 

 ##########
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##########
 @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws 
IOException {
        }
 
        /**
-        * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
+        * Starts a session cluster on YARN, and submits a streaming job.
+        *
+        * <p>Tests
+        * <ul>
+        * <li>if a custom YARN application name can be set from the command 
line,
+        * <li>if the number of TaskManager slots can be set from the command 
line,
+        * <li>if dynamic properties from the command line are set,
+        * <li>if the vcores are set correctly (FLINK-2213),
+        * <li>if jobmanager hostname/port are shown in web interface 
(FLINK-1902)
+        * </ul>
+        *
+        * <p><b>Hint: </b> If you think it is a good idea to add more 
assertions to this test, think again!
         */
-       @Test(timeout = 100000) // timeout after 100 seconds
-       public void testTaskManagerFailure() throws Exception {
-               assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
-               LOG.info("Starting testTaskManagerFailure()");
-               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-                               "-n", "1",
+       @Test(timeout = 100_000)
+       public void 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots()
 throws Exception {
+               final Runner yarnSessionClusterRunner = startWithArgs(new 
String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", 
flinkLibFolder.getAbsolutePath(),
                                "-jm", "768m",
                                "-tm", "1024m",
                                "-s", "3", // set the slots 3 to check if the 
vCores are set properly!
                                "-nm", "customName",
                                "-Dfancy-configuration-value=veryFancy",
                                "-Dyarn.maximum-failed-containers=3",
                                "-D" + YarnConfigOptions.VCORES.key() + "=2"},
-                       "Number of connected TaskManagers changed to 1. Slots 
available: 3",
+                       "Flink JobManager is now running on ",
                        RunTypes.YARN_SESSION);
 
-               Assert.assertEquals(2, getRunningContainers());
+               final String logs = outContent.toString();
+               final String hostname = parseJobManagerHostname(logs);
+               LOG.info("Extracted hostname: {}", hostname);
 
-               // ------------------------ Test if JobManager web interface is 
accessible -------
+               final ApplicationReport applicationReport = 
getOnlyApplicationReport();
+               final String restApiBaseUrl = 
normalizeTrackingUrl(applicationReport.getTrackingUrl());
+               LOG.info("Got application URL from YARN {}", restApiBaseUrl);
 
-               final YarnClient yc = YarnClient.createYarnClient();
-               yc.init(YARN_CONFIGURATION);
-               yc.start();
+               submitJob("WindowJoin.jar");
+               waitForTaskManagerRegistration(restApiBaseUrl, 
Duration.ofMillis(30_000));
 
-               List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-               Assert.assertEquals(1, apps.size()); // Only one running
-               ApplicationReport app = apps.get(0);
-               Assert.assertEquals("customName", app.getName());
-               String url = app.getTrackingUrl();
-               if (!url.endsWith("/")) {
-                       url += "/";
-               }
-               if (!url.startsWith("http://";)) {
-                       url = "http://"; + url;
-               }
-               LOG.info("Got application URL from YARN {}", url);
+               //
+               // Assert that custom YARN application name "customName" is set
+               //
+               assertEquals("customName", applicationReport.getName());
 
-               String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+               //
+               // Assert the number of TaskManager slots are set
+               //
+               assertNumberOfSlotsPerTask(restApiBaseUrl, 3);
 
-               JsonNode parsedTMs = new ObjectMapper().readTree(response);
-               ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
-               Assert.assertNotNull(taskManagers);
-               Assert.assertEquals(1, taskManagers.size());
-               Assert.assertEquals(3, 
taskManagers.get(0).get("slotsNumber").asInt());
+               final Map<String, String> flinkConfig = 
getFlinkConfig(restApiBaseUrl);
 
-               // get the configuration from webinterface & check if the 
dynamic properties from YARN show up there.
-               String jsonConfig = TestBaseUtils.getFromHTTP(url + 
"jobmanager/config");
-               Map<String, String> parsedConfig = 
WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
+               //
+               // Assert dynamic properties
+               //
+               assertThat(flinkConfig, hasEntry("fancy-configuration-value", 
"veryFancy"));
+               assertThat(flinkConfig, 
hasEntry("yarn.maximum-failed-containers", "3"));
+
+               //
+               // FLINK-2213: assert that vcores are set
+               //
+               assertThat(flinkConfig, 
hasEntry(YarnConfigOptions.VCORES.key(), "2"));
 
-               Assert.assertEquals("veryFancy", 
parsedConfig.get("fancy-configuration-value"));
-               Assert.assertEquals("3", 
parsedConfig.get("yarn.maximum-failed-containers"));
-               Assert.assertEquals("2", 
parsedConfig.get(YarnConfigOptions.VCORES.key()));
+               //
+               // FLINK-1902: check if jobmanager hostname is shown in web 
interface
+               //
+               assertThat(flinkConfig, 
hasEntry(JobManagerOptions.ADDRESS.key(), hostname));
 
-               // -------------- FLINK-1902: check if jobmanager hostname/port 
are shown in web interface
-               // first, get the hostname/port
-               String oC = outContent.toString();
-               Pattern p = Pattern.compile("Flink JobManager is now running on 
([a-zA-Z0-9.-]+):([0-9]+)");
-               Matcher matches = p.matcher(oC);
+               yarnSessionClusterRunner.sendStop();
+               yarnSessionClusterRunner.join();
+       }
+
+       private static String parseJobManagerHostname(final String logs) {
+               final Pattern p = Pattern.compile("Flink JobManager is now 
running on ([a-zA-Z0-9.-]+):([0-9]+)");
+               final Matcher matches = p.matcher(logs);
                String hostname = null;
-               String port = null;
+
                while (matches.find()) {
                        hostname = matches.group(1).toLowerCase();
-                       port = matches.group(2);
                }
-               LOG.info("Extracted hostname:port: {} {}", hostname, port);
-
-               Assert.assertEquals("unable to find hostname in " + jsonConfig, 
hostname,
-                       parsedConfig.get(JobManagerOptions.ADDRESS.key()));
-               Assert.assertEquals("unable to find port in " + jsonConfig, 
port,
-                       parsedConfig.get(JobManagerOptions.PORT.key()));
-
-               // test logfile access
-               String logs = TestBaseUtils.getFromHTTP(url + "jobmanager/log");
-               Assert.assertTrue(logs.contains("Starting YARN 
ApplicationMaster"));
-               Assert.assertTrue(logs.contains("Starting JobManager"));
-               Assert.assertTrue(logs.contains("Starting JobManager Web 
Frontend"));
-
-               // ------------------------ Kill container with TaskManager and 
check if vcores are set correctly -------
-
-               // find container id of taskManager:
-               ContainerId taskManagerContainer = null;
-               NodeManager nodeManager = null;
-               UserGroupInformation remoteUgi = null;
-               NMTokenIdentifier nmIdent = null;
-               try {
-                       remoteUgi = UserGroupInformation.getCurrentUser();
-               } catch (IOException e) {
-                       LOG.warn("Unable to get curr user", e);
-                       Assert.fail();
-               }
-               for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-                       NodeManager nm = yarnCluster.getNodeManager(nmId);
-                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
-                       for (Map.Entry<ContainerId, Container> entry : 
containers.entrySet()) {
-                               String command = 
StringUtils.join(entry.getValue().getLaunchContext().getCommands(), " ");
-                               if 
(command.contains(YarnTaskManager.class.getSimpleName())) {
-                                       taskManagerContainer = entry.getKey();
-                                       nodeManager = nm;
-                                       nmIdent = new 
NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "", 0);
-                                       // allow myself to do stuff with the 
container
-                                       // 
remoteUgi.addCredentials(entry.getValue().getCredentials());
-                                       remoteUgi.addTokenIdentifier(nmIdent);
-                               }
-                       }
-                       sleep(500);
-               }
-
-               Assert.assertNotNull("Unable to find container with 
TaskManager", taskManagerContainer);
-               Assert.assertNotNull("Illegal state", nodeManager);
 
-               yc.stop();
+               assertNotNull("hostname not found in log", hostname);
 
-               List<ContainerId> toStop = new LinkedList<ContainerId>();
-               toStop.add(taskManagerContainer);
-               StopContainersRequest scr = 
StopContainersRequest.newInstance(toStop);
+               return hostname;
+       }
 
-               try {
-                       
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
-               } catch (Throwable e) {
-                       LOG.warn("Error stopping container", e);
-                       Assert.fail("Error stopping container: " + 
e.getMessage());
-               }
+       private ApplicationReport getOnlyApplicationReport() throws 
IOException, YarnException {
+               final YarnClient yarnClient = getYarnClient();
+               checkState(yarnClient != null);
 
-               // stateful termination check:
-               // wait until we saw a container being killed and AFTERWARDS a 
new one launched
-               boolean ok = false;
-               do {
-                       LOG.debug("Waiting for correct order of events. Output: 
{}", errContent.toString());
-
-                       String o = errContent.toString();
-                       int killedOff = o.indexOf("Container killed by the 
ApplicationMaster");
-                       if (killedOff != -1) {
-                               o = o.substring(killedOff);
-                               ok = o.indexOf("Launching TaskManager") > 0;
-                       }
-                       sleep(1000);
-               } while(!ok);
+               final List<ApplicationReport> apps = 
yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+               assertEquals(1, apps.size()); // Only one running
+               return apps.get(0);
+       }
 
-               // send "stop" command to command line interface
-               runner.sendStop();
-               // wait for the thread to stop
-               try {
-                       runner.join();
-               } catch (InterruptedException e) {
-                       LOG.warn("Interrupted while stopping runner", e);
+       private static String normalizeTrackingUrl(final String trackingUrl) {
+               String url = trackingUrl;
+               if (!url.endsWith("/")) {
+                       url += "/";
+               }
+               if (!url.startsWith("http://";)) {
+                       url = "http://"; + url;
                }
-               LOG.warn("stopped");
+               return url;
+       }
 
-               // ----------- Send output to logger
-               System.setOut(ORIGINAL_STDOUT);
-               System.setErr(ORIGINAL_STDERR);
-               oC = outContent.toString();
-               String eC = errContent.toString();
-               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
oC);
-               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
eC);
+       private void submitJob(final String jobFileName) throws IOException, 
InterruptedException {
+               Runner jobRunner = startWithArgs(new String[]{"run",
+                               "--detached", 
getTestJarPath(jobFileName).getAbsolutePath()},
+                       "Job has been submitted with JobID", 
RunTypes.CLI_FRONTEND);
+               jobRunner.join();
+       }
 
-               // ------ Check if everything happened correctly
-               Assert.assertTrue("Expect to see failed container",
-                       eC.contains("New messages from the YARN cluster"));
+       private static void waitForTaskManagerRegistration(final String url, 
final Duration waitDuration) throws Exception {
+               waitUntilCondition(() -> getNumberOfTaskManagers(url) > 0, 
Deadline.fromNow(waitDuration));
+       }
 
-               Assert.assertTrue("Expect to see failed container",
-                       eC.contains("Container killed by the 
ApplicationMaster"));
+       private static void assertNumberOfSlotsPerTask(final String url, final 
int slotsNumber) throws Exception {
+               try {
+                       waitUntilCondition(() -> 
getNumberOfSlotsPerTaskManager(url) == slotsNumber, 
Deadline.fromNow(Duration.ofSeconds(30)));
+               } catch (final TimeoutException e) {
+                       final int currentNumberOfSlots = 
getNumberOfSlotsPerTaskManager(url);
+                       fail(String.format("Expected slots per TM to be %d, 
was: %d", slotsNumber, currentNumberOfSlots));
+               }
+       }
 
-               Assert.assertTrue("Expect to see new container started",
-                       eC.contains("Launching TaskManager") && eC.contains("on 
host"));
+       private static int getNumberOfTaskManagers(final String url) throws 
Exception {
+               String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+               JsonNode parsedTMs = new ObjectMapper().readTree(response);
+               ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
+               return taskManagers == null ? 0 : taskManagers.size();
+       }
 
-               // cleanup auth for the subsequent tests.
-               remoteUgi.getTokenIdentifiers().remove(nmIdent);
+       private static int getNumberOfSlotsPerTaskManager(final String url) 
throws Exception {
 
 Review comment:
   Here we could also use the `ClusterOverview` and simply divide the number of 
total slots by the number of TMs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to