Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5901#discussion_r185449426
--- Diff:
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
---
@@ -85,54 +88,75 @@ public void checkForProhibitedLogContents() {
public void testDetachedMode() throws InterruptedException, IOException
{
LOG.info("Starting testDetachedMode()");
addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
- Runner runner =
- startWithArgs(new String[]{"-j",
flinkUberjar.getAbsolutePath(),
- "-t",
flinkLibFolder.getAbsolutePath(),
- "-n", "1",
- "-jm", "768",
- "-tm", "1024",
- "--name", "MyCustomName", //
test setting a custom name
- "--detached"},
- "Flink JobManager is now running on",
RunTypes.YARN_SESSION);
-
+ File exampleJarLocation =
getTestJarPath("StreamingWordCount.jar");
+ // get temporary file for reading input data for wordcount
example
+ File tmpInFile;
+ try {
+ tmpInFile = tmp.newFile();
+ FileUtils.writeStringToFile(tmpInFile,
WordCountData.TEXT);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Runner runner = isNewMode ?
+ startWithArgs(new String[]{"run", "-m",
"yarn-cluster",
+ "-yj",
flinkUberjar.getAbsolutePath(),
+ "-yt",
flinkLibFolder.getAbsolutePath(),
+ "-yn", "1",
+ "-yjm", "768",
+ "-ytm", "1024",
+ "-ynm",
"MyCustomName", // test setting a custom name
+ "--detached",
exampleJarLocation.getAbsolutePath(),
+ "--input",
tmpInFile.getAbsoluteFile().toString()},
+ "Job has been submitted with
JobID", RunTypes.CLI_FRONTEND) :
+ startWithArgs(new String[]{"-j",
flinkUberjar.getAbsolutePath(),
+ "-t",
flinkLibFolder.getAbsolutePath(),
+ "-n", "1",
+ "-jm", "768",
+ "-tm", "1024",
+ "--name",
"MyCustomName", // test setting a custom name
+ "--detached"},
+ "Flink JobManager is now
running on", RunTypes.YARN_SESSION);
// before checking any strings outputted by the CLI, first give
it time to return
runner.join();
- checkForLogString("The Flink YARN client has been started in
detached mode");
- if (!isNewMode) {
- LOG.info("Waiting until two containers are running");
- // wait until two containers are running
- while (getRunningContainers() < 2) {
- sleep(500);
- }
+ LOG.info("Waiting until two containers are running");
+ // wait until two containers are running
+ while (getRunningContainers() < 2) {
+ sleep(500);
}
// additional sleep for the JM/TM to start and establish
connection
long startTime = System.nanoTime();
while (System.nanoTime() - startTime <
TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS) &&
!(verifyStringsInNamedLogFiles(
- new String[]{"YARN Application
Master started"}, "jobmanager.log") &&
+ new String[]{isNewMode ?
"JobManager successfully registered at ResourceManager"
+ : "YARN
Application Master started"}, "jobmanager.log") &&
verifyStringsInNamedLogFiles(
- new
String[]{"Starting TaskManager actor"}, "taskmanager.log"))) {
+ new
String[]{isNewMode ? "Successful registration at job manager"
+
: "Starting TaskManager actor"}, "taskmanager.log"))) {
LOG.info("Still waiting for JM/TM to initialize...");
sleep(500);
}
LOG.info("Two containers are running. Killing the application");
+ // Wait for the cluster to shutdown itself, so it wont get
killed externally during shutdown.
--- End diff --
A simple static sleep is inviting test instability. Can we have a more
deterministic check? For example, wait for the cluster to be shutdown by
polling the status or something.
---