[ 
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279454#comment-17279454
 ] 

Jichao Wang edited comment on FLINK-21274 at 2/5/21, 8:52 AM:
--------------------------------------------------------------

I want to try further explanation:

I added some logs that print thread information to 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint{color} to 
get the status before the JobManager exits. The code is modified as follows:

// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

 final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
 try {
   clusterEntrypoint.startCluster();
 } catch (ClusterEntrypointException e) {
   LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
   System.exit(STARTUP_FAILURE_RETURN_CODE);
 }
// code1-1
 clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
   final int returnCode;

   LOG.info("================clusterEntrypoint.getTerminationFuture 
isTerminal======================================");
   LOG.info("===Current thread name is: " + Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
 printThreads();

   if (throwable != null) {
     returnCode = RUNTIME_FAILURE_RETURN_CODE;
   } else {
     returnCode = applicationStatus.processExitCode();
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
 });
 }{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
                LOG.info("===================Starting 
cleanupDirectories================================");
                LOG.info("cleanupDirectories===Current thread name is: " + 
Thread.currentThread().getName()
                        + ", isDaemon: " + Thread.currentThread().isDaemon());
                printThreads("cleanupDirectories");
                ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);             
                final String webTmpDir = 
configuration.getString(WebOptions.TMP_DIR);
                FileUtils.deleteDirectory(new File(webTmpDir));
}
{code}
 

// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 final long shutdownTimeout = 
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

 synchronized (lock) {
 Throwable exception = null;

 final Collection<CompletableFuture<Void>> terminationFutures = new 
ArrayList<>(3);

 if (blobServer != null) {
   try {
     blobServer.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (haServices != null) {
   try {
     if (cleanupHaData) {
       haServices.closeAndCleanupAllData();
     } else {
       haServices.close();
     }
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   } 
 }

 if (archivedExecutionGraphStore != null) {
   try {
     archivedExecutionGraphStore.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (processMetricGroup != null) {
   processMetricGroup.close();
 }

 if (metricRegistry != null) {
   LOG.info("===metricRegistry is not null");
   CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown();
   terminationFutures.add(futureMetricRegistry);
   futureMetricRegistry.whenComplete((aVoid, throwable) -> {
     // code3-1
     LOG.info("========================metricRegistry shutdowns 
successfully===================================");
     LOG.info("metricRegistry===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("metricRegistry");
   });
 }

 if (ioExecutor != null) {
   LOG.info("===ioExecutor is not null");
   CompletableFuture<Void> futureIoExecutor = 
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
   TimeUnit.MILLISECONDS, ioExecutor);
   terminationFutures.add(futureIoExecutor);
   futureIoExecutor.whenComplete((aVoid, throwable) -> {
     // code3-2
     LOG.info("==============ioExecutor shutdowns 
successfully==========================");
     LOG.info("ioExecutor===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("ioExecutor");
   });
 }

 if (commonRpcService != null) {
   LOG.info("===commonRpcService is not null");
   CompletableFuture<Void> futureCommonRpcService = 
commonRpcService.stopService();
   terminationFutures.add(futureCommonRpcService);
   futureCommonRpcService.whenComplete((aVoid, throwable) -> {
     // code3-3
     LOG.info("============================commonRpcService shutdowns 
successfully=====================================");
     LOG.info("commonRpcService===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("commonRpcService");
   });

 }

 if (exception != null) {
   terminationFutures.add(FutureUtils.completedExceptionally(exception));
 }

 return FutureUtils.completeAll(terminationFutures);
 }{code}
// code4
{code:java}
public static void printThreads(String name) {
 ThreadGroup group = Thread.currentThread().getThreadGroup();
 ThreadGroup topGroup = group;
 while (group != null) {
 topGroup = group;
 group = group.getParent();
 }
 int slackSize = topGroup.activeCount() * 2;
 Thread[] slackThreads = new Thread[slackSize];
 int actualSize = topGroup.enumerate(slackThreads);
 Thread[] atualThreads = new Thread[actualSize];
 System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
 LOG.info(name+"===Threads size is " + atualThreads.length);
 for (Thread thread : atualThreads) {
 LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " + 
thread.isDaemon());
 }

 }{code}
 
 The attached log file description:

 

[^Add wait 5 seconds in 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log]This log file 
corresponds to {color:#0747a6}code6{color}, and a 5-second wait logic is added 
to the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

 

[^Not add wait 5 seconds.log]This log file corresponds to 
{color:#0747a6}code6{color}, delete the logic to wait for 5 seconds in the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
 throws IOException {
 try {
   FileSystem fs = rootPath.getFileSystem();
   Path path = new Path(rootPath, jobId.toString());
   OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

   try {
     LOG.info("===========================Wait 5 seconds..");
     Thread.sleep(5000);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }

   try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
     ... // Part of the code is omitted here
   } catch (Exception e) {
 fs.delete(path, false);
 throw e;
 }
 LOG.info("Job {} has been archived at {}.", jobId, path);
 return path;
 } catch (IOException e) {
 LOG.error("Failed to archive job.", e);
 throw e;
 }
}{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
   throws IOException {
   try {
     FileSystem fs = rootPath.getFileSystem();
     Path path = new Path(rootPath, jobId.toString());
     OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
 
     // There is no logic to wait for 5 seconds here
     try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
       ... // Part of the code is omitted here
     } catch (Exception e) {
       fs.delete(path, false);
       throw e;
     }
     LOG.info("Job {} has been archived at {}.", jobId, path);
     return path;
   } catch (IOException e) {
     LOG.error("Failed to archive job.", e);
     throw e;
   }
}{code}
I think that the thread information printed in {color:#0747a6}code1-1{color} 
and {color:#0747a6}code2{color} is the thread information before the JobManager 
process exits.

If add a 5-second wait, {color:#0747a6}code3-2{color} and{color:#0747a6} 
code1-1{color} will not be executed, which means that ioExecutor fails to exit 
normally. And {color:#0747a6}code2{color} is executed by the 
YarnJobClusterEntrypoint shutdown hook. Why does this happen? I think we need 
to think about it carefully.

If you delete the 5-second wait, we find that the program exits normally, and 
{color:#0747a6}code1-1{color} is executed by the 
{color:#0747a6}flink-akka.actor.default-dispatcher-16{color} thread. 
Corresponding to this, there is another situation. If metricRegistry exits 
after commonRpcService,{color:#0747a6} code1-1{color} will be executed by 
{color:#0747a6}flink-metrics-scheduler-1{color}.

JobManager exits multiple thread pools in parallel before JobManager process 
exiting. Different thread pool exit orders result in different threads 
executing {color:#0747a6}code1-1{color}. If the thread pool is daemon pool (for 
example: ioExecutor) and finally exits, {color:#0747a6}code1-1{color} will not 
be executed. This is the root cause of the problem.

If the 
{color:#0747a6}org.apache.flink.runtime.util.ExecutorThreadFactory#newThread{color}
 method corresponding to ioExecutor is modified from 
{color:#0747a6}code7{color} to {color:#0747a6}code8{color}, after doing so, 
even if we wait 10 seconds in {color:#0747a6}code5{color}, 
{color:#0747a6}code1-1{color} will be executed. At this time, the thread 
executing code1-1 will be 
{color:#0747a6}ForkJoinPool.commonPool-worker-57{color}

// code7
{code:java}
 @Override
 public Thread newThread(Runnable runnable) {
   Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
   t.setDaemon(true);
 
   t.setPriority(threadPriority);

   // optional handler for uncaught exceptions
   if (exceptionHandler != null) {
     t.setUncaughtExceptionHandler(exceptionHandler);
   }

   return t;
 }{code}
// code8
{code:java}
@Override
 public Thread newThread(Runnable runnable) {
   Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
   t.setDaemon(false);

   t.setPriority(threadPriority);

   // optional handler for uncaught exceptions
   if (exceptionHandler != null) {
     t.setUncaughtExceptionHandler(exceptionHandler);
   }
   return t;
}{code}
I think {color:#0747a6}code1-1{color} needs to be executed by MainThread, 
otherwise if the daemon thread pool finally exits, 
{color:#0747a6}code1-1{color} will not be executed.

[~fly_in_gis] What do you think?


was (Author: wjc920):
I want to try further explanation:

I added some logs that print thread information to 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint{color} to 
get the status before the JobManager exits. The code is modified as follows:

// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

 final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
 try {
   clusterEntrypoint.startCluster();
 } catch (ClusterEntrypointException e) {
   LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
   System.exit(STARTUP_FAILURE_RETURN_CODE);
 }
// code1-1
 clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
   final int returnCode;

   LOG.info("================clusterEntrypoint.getTerminationFuture 
isTerminal======================================");
   LOG.info("===Current thread name is: " + Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
 printThreads();

   if (throwable != null) {
     returnCode = RUNTIME_FAILURE_RETURN_CODE;
   } else {
     returnCode = applicationStatus.processExitCode();
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
 });
 }{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
                LOG.info("===================Starting 
cleanupDirectories================================");
                LOG.info("cleanupDirectories===Current thread name is: " + 
Thread.currentThread().getName()
                        + ", isDaemon: " + Thread.currentThread().isDaemon());
                printThreads("cleanupDirectories");
                ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);             
                final String webTmpDir = 
configuration.getString(WebOptions.TMP_DIR);
                FileUtils.deleteDirectory(new File(webTmpDir));
}
{code}
 

// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 final long shutdownTimeout = 
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

 synchronized (lock) {
 Throwable exception = null;

 final Collection<CompletableFuture<Void>> terminationFutures = new 
ArrayList<>(3);

 if (blobServer != null) {
   try {
     blobServer.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (haServices != null) {
   try {
     if (cleanupHaData) {
       haServices.closeAndCleanupAllData();
     } else {
       haServices.close();
     }
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   } 
 }

 if (archivedExecutionGraphStore != null) {
   try {
     archivedExecutionGraphStore.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (processMetricGroup != null) {
   processMetricGroup.close();
 }

 if (metricRegistry != null) {
   LOG.info("===metricRegistry is not null");
   CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown();
   terminationFutures.add(futureMetricRegistry);
   futureMetricRegistry.whenComplete((aVoid, throwable) -> {
     // code3-1
     LOG.info("========================metricRegistry shutdowns 
successfully===================================");
     LOG.info("metricRegistry===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("metricRegistry");
   });
 }

 if (ioExecutor != null) {
   LOG.info("===ioExecutor is not null");
   CompletableFuture<Void> futureIoExecutor = 
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
   TimeUnit.MILLISECONDS, ioExecutor);
   terminationFutures.add(futureIoExecutor);
   futureIoExecutor.whenComplete((aVoid, throwable) -> {
     // code3-2
     LOG.info("==============ioExecutor shutdowns 
successfully==========================");
     LOG.info("ioExecutor===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("ioExecutor");
   });
 }

 if (commonRpcService != null) {
   LOG.info("===commonRpcService is not null");
   CompletableFuture<Void> futureCommonRpcService = 
commonRpcService.stopService();
   terminationFutures.add(futureCommonRpcService);
   futureCommonRpcService.whenComplete((aVoid, throwable) -> {
     // code3-3
     LOG.info("============================commonRpcService shutdowns 
successfully=====================================");
     LOG.info("commonRpcService===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("commonRpcService");
   });

 }

 if (exception != null) {
   terminationFutures.add(FutureUtils.completedExceptionally(exception));
 }

 return FutureUtils.completeAll(terminationFutures);
 }{code}
// code4
{code:java}
public static void printThreads(String name) {
 ThreadGroup group = Thread.currentThread().getThreadGroup();
 ThreadGroup topGroup = group;
 while (group != null) {
 topGroup = group;
 group = group.getParent();
 }
 int slackSize = topGroup.activeCount() * 2;
 Thread[] slackThreads = new Thread[slackSize];
 int actualSize = topGroup.enumerate(slackThreads);
 Thread[] atualThreads = new Thread[actualSize];
 System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
 LOG.info(name+"===Threads size is " + atualThreads.length);
 for (Thread thread : atualThreads) {
 LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " + 
thread.isDaemon());
 }

 }{code}
 
 The attached log file description:

 

[^Add wait 5 seconds in 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log]This log file 
corresponds to {color:#0747a6}code6{color}, and a 5-second wait logic is added 
to the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

 

[^Not add wait 5 seconds.log]This log file corresponds to 
{color:#0747a6}code6{color}, delete the logic to wait for 5 seconds in the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
 throws IOException {
 try {
   FileSystem fs = rootPath.getFileSystem();
   Path path = new Path(rootPath, jobId.toString());
   OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

   try {
     LOG.info("===========================Wait 5 seconds..");
     Thread.sleep(5000);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }

   try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
     ... // Part of the code is omitted here
   } catch (Exception e) {
 fs.delete(path, false);
 throw e;
 }
 LOG.info("Job {} has been archived at {}.", jobId, path);
 return path;
 } catch (IOException e) {
 LOG.error("Failed to archive job.", e);
 throw e;
 }
}{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
   throws IOException {
   try {
     FileSystem fs = rootPath.getFileSystem();
     Path path = new Path(rootPath, jobId.toString());
     OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
 
     // There is no logic to wait for 5 seconds here
     try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
       ... // Part of the code is omitted here
     } catch (Exception e) {
       fs.delete(path, false);
       throw e;
     }
     LOG.info("Job {} has been archived at {}.", jobId, path);
     return path;
   } catch (IOException e) {
     LOG.error("Failed to archive job.", e);
     throw e;
   }
}{code}
I think that the thread information printed in {color:#0747a6}code1-1{color} 
and {color:#0747a6}code2{color} is the thread information before the JobManager 
process exits.

If add a 5-second wait, {color:#0747a6}code3-2{color} and{color:#0747a6} 
code1-1{color} will not be executed, which means that ioExecutor fails to exit 
normally. And {color:#0747a6}code2{color} is executed by the 
YarnJobClusterEntrypoint shutdown hook. Why does this happen? I think we need 
to think about it carefully.

If you delete the 5-second wait, we find that the program exits normally, and 
{color:#0747a6}code1-1{color} is executed by the 
{color:#0747a6}flink-akka.actor.default-dispatcher-16{color} thread. 
Corresponding to this, there is another situation. If metricRegistry exits 
after commonRpcService,{color:#0747a6} code1-1{color} will be executed by 
{color:#0747a6}flink-metrics-scheduler-1{color}.

JobManager exits multiple thread pools in parallel before JobManager process 
exiting. Different thread pool exit orders result in different threads 
executing {color:#0747a6}code1-1{color}. If the thread pool is daemon pool (for 
example: ioExecutor) and finally exits, {color:#0747a6}code1-1{color} will not 
be executed. This is the root cause of the problem.

If the 
{color:#0747a6}org.apache.flink.runtime.util.ExecutorThreadFactory#newThread{color}
 method corresponding to ioExecutor is modified from 
{color:#0747a6}code7{color} to {color:#0747a6}code8{color}, after doing so, 
even if we wait 10 seconds in {color:#0747a6}code5{color}, 
{color:#0747a6}code1-1{color} will be executed. At this time, the thread 
executing code1-1 will be 
{color:#0747a6}ForkJoinPool.commonPool-worker-57{color}

// code7
{code:java}
 @Override
 public Thread newThread(Runnable runnable) {
   Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
   t.setDaemon(true);
 
   t.setPriority(threadPriority);

   // optional handler for uncaught exceptions
   if (exceptionHandler != null) {
     t.setUncaughtExceptionHandler(exceptionHandler);
   }

   return t;
 }{code}
// code8
{code:java}
@Override
 public Thread newThread(Runnable runnable) {
   Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
   t.setDaemon(false);

   t.setPriority(threadPriority);

   // optional handler for uncaught exceptions
   if (exceptionHandler != null) {
     t.setUncaughtExceptionHandler(exceptionHandler);
   }
   return t;
}{code}
I think {color:#0747a6}code1-1{color} needs to be executed by MainThread, 
otherwise if the daemon thread pool finally exits, 
{color:#0747a6}code1-1{color} will not be executed.

What do you think?

> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job 
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21274
>                 URL: https://issues.apache.org/jira/browse/FLINK-21274
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.1
>            Reporter: Jichao Wang
>            Priority: Critical
>         Attachments: 1.png, 2.png, Add wait 5 seconds in 
> org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait 
> 5 seconds.log, application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml), 
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster 
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a 
> WorkCount task to the Yarn cluster. Under normal circumstances, after the 
> task is completed, the flink job execution information will be archived to 
> HDFS, and then the JobManager process will exit. However, when this archiving 
> process takes a long time (maybe the HDFS write speed is slow), the task 
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the 
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
>  method to wait 5 seconds before actually writing to HDFS (simulating a slow 
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive) 
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding 
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment 
> application_1612404624605_0010-JobManager.log for log details), and found 
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before 
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above 
> situation occurs, indicating that the JobManager is running to a certain 
> point, and there is no user thread in the JobManager process, which causes 
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor, 
> metricRegistry, commonRpcService) were exited asynchronously in 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
>  and multiple services would be exited in the shutdown() method of 
> metricRegistry (for example : executor), these exit actions are executed 
> asynchronously and in parallel. If ioExecutor or executor exits last, it will 
> cause the above problems.The root cause is that the threads in these two 
> Executors are daemon threads, while the threads in Akka Actor are user 
> threads.
> I hope to modify the following code to fix this bug. If it is determined that 
> this is a bug (this problem will affect all versions above 1.9), please 
> assign the ticket to me, thank you.
> Only need to modify the 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
>  method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    int returnCode;
>    Throwable throwable = null;
>    try {
>       returnCode = 
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
>    } catch (Throwable e) {
>       throwable = e;
>       returnCode = RUNTIME_FAILURE_RETURN_CODE;
>    }
>    LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
> clusterEntrypointName, returnCode, throwable);
>    System.exit(returnCode);
> }{code}
>  Before fixing: 
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
> throwable) -> {
>       final int returnCode;
>       if (throwable != null) {
>          returnCode = RUNTIME_FAILURE_RETURN_CODE;
>       } else {
>          returnCode = applicationStatus.processExitCode();
>       }
>       LOG.info("Terminating cluster entrypoint process {} with exit code 
> {}.", clusterEntrypointName, returnCode, throwable);
>       System.exit(returnCode);
>    });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to