[ https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279627#comment-17279627 ]
Jichao Wang edited comment on FLINK-21274 at 2/5/21, 11:00 AM: --------------------------------------------------------------- We can also modify {color:#0747a6}org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main{color}. Compared with the way I mentioned at the beginning, this way involves the modification of multiple main methods(e.g. {color:#0747a6}org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint#main{color}, {color:#0747a6}org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint{color}#main, and so on...). org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main {code:java} public static void main(String[] args) { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint( configuration); ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); int returnCode; Throwable throwable = null; try { returnCode = yarnJobClusterEntrypoint.getTerminationFuture().get().processExitCode(); } catch (Throwable e) { throwable = e; returnCode = RUNTIME_FAILURE_RETURN_CODE; } LOG.info("Terminating cluster entrypoint process {} with exit code {}.", yarnJobClusterEntrypoint.getClass().getSimpleName(), returnCode, throwable); System.exit(returnCode); } {code} All my modifications are to make the main thread exit last. Is there a better way to solve this problem? was (Author: wjc920): We can also modify {color:#0747a6}org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main{color}. Compared with the method I mentioned at the beginning, this method involves the modification of multiple main methods(e.g. {color:#0747a6}org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint#main{color}, {color:#0747a6}org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint{color}#main, and so on...). org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main {code:java} public static void main(String[] args) { // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint( configuration); ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint); int returnCode; Throwable throwable = null; try { returnCode = yarnJobClusterEntrypoint.getTerminationFuture().get().processExitCode(); } catch (Throwable e) { throwable = e; returnCode = RUNTIME_FAILURE_RETURN_CODE; } LOG.info("Terminating cluster entrypoint process {} with exit code {}.", yarnJobClusterEntrypoint.getClass().getSimpleName(), returnCode, throwable); System.exit(returnCode); } {code} All my modifications are to make the main thread exit last. Is there a better way to solve this problem? > At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job > archive file will upload fails > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-21274 > URL: https://issues.apache.org/jira/browse/FLINK-21274 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.10.1 > Reporter: Jichao Wang > Priority: Critical > Attachments: 1.png, 2.png, Add wait 5 seconds in > org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait > 5 seconds.log, application_1612404624605_0010-JobManager.log > > > This is a partial configuration of my Flink History service(flink-conf.yaml), > and this is also the configuration of my Flink client. > {code:java} > jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/ > {code} > I used {color:#0747a6}flink run -m yarn-cluster > /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a > WorkCount task to the Yarn cluster. Under normal circumstances, after the > task is completed, the flink job execution information will be archived to > HDFS, and then the JobManager process will exit. However, when this archiving > process takes a long time (maybe the HDFS write speed is slow), the task > archive file upload fails. > The specific reproduction method is as follows: > Modify the > {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color} > method to wait 5 seconds before actually writing to HDFS (simulating a slow > write speed scenario). > {code:java} > public static Path archiveJob(Path rootPath, JobID jobId, > Collection<ArchivedJson> jsonToArchive) > throws IOException { > try { > FileSystem fs = rootPath.getFileSystem(); > Path path = new Path(rootPath, jobId.toString()); > OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); > try { > LOG.info("===========================Wait 5 seconds.."); > Thread.sleep(5000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > try (JsonGenerator gen = jacksonFactory.createGenerator(out, > JsonEncoding.UTF8)) { > ... // Part of the code is omitted here > } catch (Exception e) { > fs.delete(path, false); > throw e; > } > LOG.info("Job {} has been archived at {}.", jobId, path); > return path; > } catch (IOException e) { > LOG.error("Failed to archive job.", e); > throw e; > } > } > {code} > After I make the above changes to the code, I cannot find the corresponding > task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png). > Then I went to Yarn to browse the JobManager log (see attachment > application_1612404624605_0010-JobManager.log for log details), and found > that the following logs are missing in the task log: > {code:java} > INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process > YarnJobClusterEntrypoint with exit code 0.{code} > Usually, if the task exits normally, a similar log will be printed before > executing {color:#0747a6}System.exit(returnCode){color}. > If no Exception information is found in the JobManager log, the above > situation occurs, indicating that the JobManager is running to a certain > point, and there is no user thread in the JobManager process, which causes > the program to exit without completing the normal process. > Eventually I found out that multiple services (for example: ioExecutor, > metricRegistry, commonRpcService) were exited asynchronously in > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color}, > and multiple services would be exited in the shutdown() method of > metricRegistry (for example : executor), these exit actions are executed > asynchronously and in parallel. If ioExecutor or executor exits last, it will > cause the above problems.The root cause is that the threads in these two > Executors are daemon threads, while the threads in Akka Actor are user > threads. > I hope to modify the following code to fix this bug. If it is determined that > this is a bug (this problem will affect all versions above 1.9), please > assign the ticket to me, thank you. > Only need to modify the > {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color} > method: > After fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > int returnCode; > Throwable throwable = null; > try { > returnCode = > clusterEntrypoint.getTerminationFuture().get().processExitCode(); > } catch (Throwable e) { > throwable = e; > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } > LOG.info("Terminating cluster entrypoint process {} with exit code {}.", > clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }{code} > Before fixing: > {code:java} > public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { > final String clusterEntrypointName = > clusterEntrypoint.getClass().getSimpleName(); > try { > clusterEntrypoint.startCluster(); > } catch (ClusterEntrypointException e) { > LOG.error(String.format("Could not start cluster entrypoint %s.", > clusterEntrypointName), e); > System.exit(STARTUP_FAILURE_RETURN_CODE); > } > clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, > throwable) -> { > final int returnCode; > if (throwable != null) { > returnCode = RUNTIME_FAILURE_RETURN_CODE; > } else { > returnCode = applicationStatus.processExitCode(); > } > LOG.info("Terminating cluster entrypoint process {} with exit code > {}.", clusterEntrypointName, returnCode, throwable); > System.exit(returnCode); > }); > } > {code} > The purpose of the modification is to ensure that the Main thread exits last. -- This message was sent by Atlassian Jira (v8.3.4#803005)