[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250224836
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws 
IOException {
}
 
/**
-* Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
+* Starts a session cluster on YARN, and submits a streaming job.
+*
+* Tests
+* 
+* if a custom YARN application name can be set from the command 
line,
+* if the number of TaskManager slots can be set from the command 
line,
+* if dynamic properties from the command line are set,
+* if the vcores are set correctly (FLINK-2213),
+* if jobmanager hostname/port are shown in web interface 
(FLINK-1902)
+* 
+*
+* Hint:  If you think it is a good idea to add more 
assertions to this test, think again!
 */
-   @Test(timeout = 10) // timeout after 100 seconds
-   public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
-   LOG.info("Starting testTaskManagerFailure()");
-   Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-   "-n", "1",
+   @Test(timeout = 100_000)
+   public void 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots()
 throws Exception {
+   final Runner yarnSessionClusterRunner = startWithArgs(new 
String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", 
flinkLibFolder.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the 
vCores are set properly!
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
-   "Number of connected TaskManagers changed to 1. Slots 
available: 3",
+   "Flink JobManager is now running on ",
RunTypes.YARN_SESSION);
 
-   Assert.assertEquals(2, getRunningContainers());
+   final String logs = outContent.toString();
+   final String hostname = parseJobManagerHostname(logs);
+   LOG.info("Extracted hostname: {}", hostname);
 
-   //  Test if JobManager web interface is 
accessible ---
+   final ApplicationReport applicationReport = 
getOnlyApplicationReport();
+   final String restApiBaseUrl = 
normalizeTrackingUrl(applicationReport.getTrackingUrl());
+   LOG.info("Got application URL from YARN {}", restApiBaseUrl);
 
-   final YarnClient yc = YarnClient.createYarnClient();
-   yc.init(YARN_CONFIGURATION);
-   yc.start();
+   submitJob("WindowJoin.jar");
+   waitForTaskManagerRegistration(restApiBaseUrl, 
Duration.ofMillis(30_000));
 
-   List apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-   Assert.assertEquals(1, apps.size()); // Only one running
-   ApplicationReport app = apps.get(0);
-   Assert.assertEquals("customName", app.getName());
-   String url = app.getTrackingUrl();
-   if (!url.endsWith("/")) {
-   url += "/";
-   }
-   if (!url.startsWith("http://;)) {
-   url = "http://; + url;
-   }
-   LOG.info("Got application URL from YARN {}", url);
+   //
+   // Assert that custom YARN application name "customName" is set
+   //
+   assertEquals("customName", applicationReport.getName());
 
-   String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+   //
+   // Assert the number of TaskManager slots are set
+   //
+   assertNumberOfSlotsPerTask(restApiBaseUrl, 3);
 
-   JsonNode parsedTMs = new ObjectMapper().readTree(response);
-   ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
-   Assert.assertNotNull(taskManagers);
-   Assert.assertEquals(1, taskManagers.size());
-   Assert.assertEquals(3, 

[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250225736
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -187,178 +188,146 @@ public void perJobYarnClusterOffHeap() throws 
IOException {
}
 
/**
-* Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
+* Starts a session cluster on YARN, and submits a streaming job.
+*
+* Tests
+* 
+* if a custom YARN application name can be set from the command 
line,
+* if the number of TaskManager slots can be set from the command 
line,
+* if dynamic properties from the command line are set,
+* if the vcores are set correctly (FLINK-2213),
+* if jobmanager hostname/port are shown in web interface 
(FLINK-1902)
+* 
+*
+* Hint:  If you think it is a good idea to add more 
assertions to this test, think again!
 */
-   @Test(timeout = 10) // timeout after 100 seconds
-   public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
-   LOG.info("Starting testTaskManagerFailure()");
-   Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-   "-n", "1",
+   @Test(timeout = 100_000)
+   public void 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots()
 throws Exception {
+   final Runner yarnSessionClusterRunner = startWithArgs(new 
String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", 
flinkLibFolder.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the 
vCores are set properly!
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
-   "Number of connected TaskManagers changed to 1. Slots 
available: 3",
+   "Flink JobManager is now running on ",
RunTypes.YARN_SESSION);
 
-   Assert.assertEquals(2, getRunningContainers());
+   final String logs = outContent.toString();
+   final String hostname = parseJobManagerHostname(logs);
+   LOG.info("Extracted hostname: {}", hostname);
 
-   //  Test if JobManager web interface is 
accessible ---
+   final ApplicationReport applicationReport = 
getOnlyApplicationReport();
+   final String restApiBaseUrl = 
normalizeTrackingUrl(applicationReport.getTrackingUrl());
+   LOG.info("Got application URL from YARN {}", restApiBaseUrl);
 
-   final YarnClient yc = YarnClient.createYarnClient();
-   yc.init(YARN_CONFIGURATION);
-   yc.start();
+   submitJob("WindowJoin.jar");
+   waitForTaskManagerRegistration(restApiBaseUrl, 
Duration.ofMillis(30_000));
 
-   List apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-   Assert.assertEquals(1, apps.size()); // Only one running
-   ApplicationReport app = apps.get(0);
-   Assert.assertEquals("customName", app.getName());
-   String url = app.getTrackingUrl();
-   if (!url.endsWith("/")) {
-   url += "/";
-   }
-   if (!url.startsWith("http://;)) {
-   url = "http://; + url;
-   }
-   LOG.info("Got application URL from YARN {}", url);
+   //
+   // Assert that custom YARN application name "customName" is set
+   //
+   assertEquals("customName", applicationReport.getName());
 
-   String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+   //
+   // Assert the number of TaskManager slots are set
+   //
+   assertNumberOfSlotsPerTask(restApiBaseUrl, 3);
 
-   JsonNode parsedTMs = new ObjectMapper().readTree(response);
-   ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
-   Assert.assertNotNull(taskManagers);
-   Assert.assertEquals(1, taskManagers.size());
-   Assert.assertEquals(3, 

[GitHub] tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base

2019-01-23 Thread GitBox
tillrohrmann commented on a change in pull request #7546: [FLINK-11390][tests] 
Port YARNSessionCapacitySchedulerITCase#testTaskManagerFailure to new code base
URL: https://github.com/apache/flink/pull/7546#discussion_r250227506
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##
 @@ -124,24 +135,84 @@ public void testKillYarnSessionClusterEntrypoint() 
throws Exception {
 
final ApplicationId id = restClusterClient.getClusterId();
 
-   waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+   waitUntilJobIsRunning(restClusterClient, jobId);
 

killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+   waitForApplicationAttempt(id, 2);
 
+   waitUntilJobIsRunning(restClusterClient, jobId);
+
+   killApplicationAndWait(id);
+   }
+
+   @Test
+   public void testJobRecoversAfterKillingTaskManager() throws Exception {
+   final RestClusterClient restClusterClient = 
deploySessionCluster(setupYarnClusterDescriptor());
+   final JobID jobId = submitJob(restClusterClient, 
createJobGraph());
+   final ApplicationId id = restClusterClient.getClusterId();
+   waitUntilJobIsRunning(restClusterClient, jobId);
+
+   stopTaskManagerContainer();
+   waitUntilJobIsRunning(restClusterClient, jobId);
 
 Review comment:
   Can it happen that the JobManager does not realize that a `TM` has died 
before we call `waitUntilJobIsRunning`? Is there an easy way to check whether 
we have restarted the job?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services