[ 
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279627#comment-17279627
 ] 

Jichao Wang edited comment on FLINK-21274 at 2/5/21, 11:00 AM:
---------------------------------------------------------------

We can also modify 
{color:#0747a6}org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main{color}.
 Compared with the way I mentioned at the beginning, this way involves the 
modification of multiple main methods(e.g. 
{color:#0747a6}org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint#main{color},
 
{color:#0747a6}org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint{color}#main,
 and so on...).

org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main
{code:java}
public static void main(String[] args) {
   // startup checks and logging
   EnvironmentInformation.logEnvironmentInfo(LOG, 
YarnJobClusterEntrypoint.class.getSimpleName(), args);
   SignalHandler.register(LOG);
   JvmShutdownSafeguard.installAsShutdownHook(LOG);

   Map<String, String> env = System.getenv();

   final String workingDirectory = 
env.get(ApplicationConstants.Environment.PWD.key());
   Preconditions.checkArgument(
      workingDirectory != null,
      "Working directory variable (%s) not set",
      ApplicationConstants.Environment.PWD.key());

   try {
      YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
   } catch (IOException e) {
      LOG.warn("Could not log YARN environment information.", e);
   }

   Configuration configuration = 
YarnEntrypointUtils.loadConfiguration(workingDirectory, env);

   YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new 
YarnJobClusterEntrypoint(
      configuration);

   ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

   int returnCode;
   Throwable throwable = null;
   try {
      returnCode = 
yarnJobClusterEntrypoint.getTerminationFuture().get().processExitCode();
   } catch (Throwable e) {
      throwable = e;
      returnCode = RUNTIME_FAILURE_RETURN_CODE;
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
      yarnJobClusterEntrypoint.getClass().getSimpleName(),
      returnCode, throwable);
   System.exit(returnCode);
}
{code}
All my modifications are to make the main thread exit last. 

Is there a better way to solve this problem?


was (Author: wjc920):
We can also modify 
{color:#0747a6}org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main{color}.
 Compared with the method I mentioned at the beginning, this method involves 
the modification of multiple main methods(e.g. 
{color:#0747a6}org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint#main{color},
 
{color:#0747a6}org.apache.flink.mesos.entrypoint.MesosJobClusterEntrypoint{color}#main,
 and so on...).

org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint#main
{code:java}
public static void main(String[] args) {
   // startup checks and logging
   EnvironmentInformation.logEnvironmentInfo(LOG, 
YarnJobClusterEntrypoint.class.getSimpleName(), args);
   SignalHandler.register(LOG);
   JvmShutdownSafeguard.installAsShutdownHook(LOG);

   Map<String, String> env = System.getenv();

   final String workingDirectory = 
env.get(ApplicationConstants.Environment.PWD.key());
   Preconditions.checkArgument(
      workingDirectory != null,
      "Working directory variable (%s) not set",
      ApplicationConstants.Environment.PWD.key());

   try {
      YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
   } catch (IOException e) {
      LOG.warn("Could not log YARN environment information.", e);
   }

   Configuration configuration = 
YarnEntrypointUtils.loadConfiguration(workingDirectory, env);

   YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new 
YarnJobClusterEntrypoint(
      configuration);

   ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);

   int returnCode;
   Throwable throwable = null;
   try {
      returnCode = 
yarnJobClusterEntrypoint.getTerminationFuture().get().processExitCode();
   } catch (Throwable e) {
      throwable = e;
      returnCode = RUNTIME_FAILURE_RETURN_CODE;
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.",
      yarnJobClusterEntrypoint.getClass().getSimpleName(),
      returnCode, throwable);
   System.exit(returnCode);
}
{code}
All my modifications are to make the main thread exit last. 

Is there a better way to solve this problem?

> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job 
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21274
>                 URL: https://issues.apache.org/jira/browse/FLINK-21274
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.1
>            Reporter: Jichao Wang
>            Priority: Critical
>         Attachments: 1.png, 2.png, Add wait 5 seconds in 
> org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait 
> 5 seconds.log, application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml), 
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster 
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a 
> WorkCount task to the Yarn cluster. Under normal circumstances, after the 
> task is completed, the flink job execution information will be archived to 
> HDFS, and then the JobManager process will exit. However, when this archiving 
> process takes a long time (maybe the HDFS write speed is slow), the task 
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the 
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
>  method to wait 5 seconds before actually writing to HDFS (simulating a slow 
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive) 
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding 
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment 
> application_1612404624605_0010-JobManager.log for log details), and found 
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before 
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above 
> situation occurs, indicating that the JobManager is running to a certain 
> point, and there is no user thread in the JobManager process, which causes 
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor, 
> metricRegistry, commonRpcService) were exited asynchronously in 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
>  and multiple services would be exited in the shutdown() method of 
> metricRegistry (for example : executor), these exit actions are executed 
> asynchronously and in parallel. If ioExecutor or executor exits last, it will 
> cause the above problems.The root cause is that the threads in these two 
> Executors are daemon threads, while the threads in Akka Actor are user 
> threads.
> I hope to modify the following code to fix this bug. If it is determined that 
> this is a bug (this problem will affect all versions above 1.9), please 
> assign the ticket to me, thank you.
> Only need to modify the 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
>  method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    int returnCode;
>    Throwable throwable = null;
>    try {
>       returnCode = 
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
>    } catch (Throwable e) {
>       throwable = e;
>       returnCode = RUNTIME_FAILURE_RETURN_CODE;
>    }
>    LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
> clusterEntrypointName, returnCode, throwable);
>    System.exit(returnCode);
> }{code}
>  Before fixing: 
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
> throwable) -> {
>       final int returnCode;
>       if (throwable != null) {
>          returnCode = RUNTIME_FAILURE_RETURN_CODE;
>       } else {
>          returnCode = applicationStatus.processExitCode();
>       }
>       LOG.info("Terminating cluster entrypoint process {} with exit code 
> {}.", clusterEntrypointName, returnCode, throwable);
>       System.exit(returnCode);
>    });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to