[ 
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279454#comment-17279454
 ] 

Jichao Wang edited comment on FLINK-21274 at 2/5/21, 8:26 AM:
--------------------------------------------------------------

我任务我的问题和ha无关,因为我提交的任务是Flink官方提供的Example中的WordCount,并没有开启HA。

My flink job and my problem have nothing to do with ha, because the task I 
submitted is the WordCount in the Example provided by Flink, and HA is not 
turned on.

我想尝试进一步的解释:

I want to try further explanation:

我在 org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
中添加了一些打印线程信息的日志,用来获取JobManager退出前的状态,代码修改如下:

I added some logs that print thread information to 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint to get the status before 
the JobManager exits. The code is modified as follows:

// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

 final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
 try {
   clusterEntrypoint.startCluster();
 } catch (ClusterEntrypointException e) {
   LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
   System.exit(STARTUP_FAILURE_RETURN_CODE);
 }
// code1-1
 clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
   final int returnCode;

   LOG.info("================clusterEntrypoint.getTerminationFuture 
isTerminal======================================");
   LOG.info("===Current thread name is: " + Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
 printThreads();

   if (throwable != null) {
     returnCode = RUNTIME_FAILURE_RETURN_CODE;
   } else {
     returnCode = applicationStatus.processExitCode();
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
 });
 }{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
                LOG.info("===================Starting 
cleanupDirectories================================");
                LOG.info("cleanupDirectories===Current thread name is: " + 
Thread.currentThread().getName()
                        + ", isDaemon: " + Thread.currentThread().isDaemon());
                printThreads("cleanupDirectories");
                ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);             final String webTmpDir = 
configuration.getString(WebOptions.TMP_DIR);           
FileUtils.deleteDirectory(new File(webTmpDir));
}
{code}
 

 

// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 final long shutdownTimeout = 
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

 synchronized (lock) {
 Throwable exception = null;

 final Collection<CompletableFuture<Void>> terminationFutures = new 
ArrayList<>(3);

 if (blobServer != null) {
   try {
     blobServer.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (haServices != null) {
   try {
     if (cleanupHaData) {
       haServices.closeAndCleanupAllData();
     } else {
       haServices.close();
     }
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   } 
 }

 if (archivedExecutionGraphStore != null) {
   try {
     archivedExecutionGraphStore.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (processMetricGroup != null) {
   processMetricGroup.close();
 }

 if (metricRegistry != null) {
   LOG.info("===metricRegistry is not null");
   CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown();
   terminationFutures.add(futureMetricRegistry);
   futureMetricRegistry.whenComplete((aVoid, throwable) -> {
     // code3-1
     LOG.info("========================metricRegistry shutdowns 
successfully===================================");
     LOG.info("metricRegistry===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("metricRegistry");
   });
 }

 if (ioExecutor != null) {
   LOG.info("===ioExecutor is not null");
   CompletableFuture<Void> futureIoExecutor = 
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
   TimeUnit.MILLISECONDS, ioExecutor);
   terminationFutures.add(futureIoExecutor);
   futureIoExecutor.whenComplete((aVoid, throwable) -> {
     // code3-2
     LOG.info("==============ioExecutor shutdowns 
successfully==========================");
     LOG.info("ioExecutor===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("ioExecutor");
   });
 }

 if (commonRpcService != null) {
   LOG.info("===commonRpcService is not null");
   CompletableFuture<Void> futureCommonRpcService = 
commonRpcService.stopService();
   terminationFutures.add(futureCommonRpcService);
   futureCommonRpcService.whenComplete((aVoid, throwable) -> {
     // code3-3
     LOG.info("============================commonRpcService shutdowns 
successfully=====================================");
     LOG.info("commonRpcService===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("commonRpcService");
   });

 }

 if (exception != null) {
   terminationFutures.add(FutureUtils.completedExceptionally(exception));
 }

 return FutureUtils.completeAll(terminationFutures);
 }{code}
// code4
{code:java}
public static void printThreads(String name) {
 ThreadGroup group = Thread.currentThread().getThreadGroup();
 ThreadGroup topGroup = group;
 while (group != null) {
 topGroup = group;
 group = group.getParent();
 }
 int slackSize = topGroup.activeCount() * 2;
 Thread[] slackThreads = new Thread[slackSize];
 int actualSize = topGroup.enumerate(slackThreads);
 Thread[] atualThreads = new Thread[actualSize];
 System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
 LOG.info(name+"===Threads size is " + atualThreads.length);
 for (Thread thread : atualThreads) {
 LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " + 
thread.isDaemon());
 }

 }{code}
 

 

日志文件解释:
 The attached log file description:

 

Add wait 5 seconds in 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log                 

这个日志文件对应code6,在 org.apache.flink.runtime.history.FsJobArchivist#archiveJob 
方法中加入了5秒等待的逻辑

This log file corresponds to code6, and a 5-second wait logic is added to the 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob method

 

Not add wait 5 seconds.log

这个日志文件对应code6, org.apache.flink.runtime.history.FsJobArchivist#archiveJob 
方法中删除5秒等待的逻辑
 This log file corresponds to code6, delete the logic to wait for 5 seconds in 
the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method

// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
 throws IOException {
 try {
   FileSystem fs = rootPath.getFileSystem();
   Path path = new Path(rootPath, jobId.toString());
   OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

   try {
     LOG.info("===========================Wait 5 seconds..");
     Thread.sleep(5000);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }

   try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
     ... // Part of the code is omitted here
   } catch (Exception e) {
 fs.delete(path, false);
 throw e;
 }
 LOG.info("Job {} has been archived at {}.", jobId, path);
 return path;
 } catch (IOException e) {
 LOG.error("Failed to archive job.", e);
 throw e;
 }
}{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
   throws IOException {
   try {
     FileSystem fs = rootPath.getFileSystem();
     Path path = new Path(rootPath, jobId.toString());
     OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
 
     // There is no logic to wait for 5 seconds here
     try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
       ... // Part of the code is omitted here
     } catch (Exception e) {
       fs.delete(path, false);
       throw e;
     }
     LOG.info("Job {} has been archived at {}.", jobId, path);
     return path;
   } catch (IOException e) {
     LOG.error("Failed to archive job.", e);
     throw e;
   }
}{code}
我认为,code1-1 和 code2 中打印的线程信息就是JobManager进程退出前的线程信息。

I think that the thread information printed in code1-1 and code2 is the thread 
information before the JobManager process exits.

如果加入5秒等待,code3-2将不会被执行,也就是说ioExecutor未能正常退出。

If add a 5-second wait, code3-2 and code1-1 will not be executed, which means 
that ioExecutor fails to exit normally.

code2 被 YarnJobClusterEntrypoint shutdown hook 执行。

and code2 is executed by the YarnJobClusterEntrypoint shutdown hook.

如果删除5秒等待,此时我们发现程序正常退出,code1-1 被 flink-akka.actor.default-dispatcher-16 线程执行。 
与此对应的还有一种情况,如果metricRegistry 比 commonRpcService后退出,执行 code1-1 的将是 
flink-metrics-scheduler-1。

If you delete the 5-second wait, we find that the program exits normally, and 
code1-1 is executed by the flink-akka.actor.default-dispatcher-16 thread. 
Corresponding to this, there is another situation. If metricRegistry exits 
after commonRpcService, code1-1 will be executed by flink-metrics-scheduler-1.

JobManager在退出前会并行退出多个线程池,不同的线程池退出顺序导致,执行 code1-1 
的线程不同。如果守护线程池(例如:ioExecutor),最后退出,code1-1将不会被执行。这就是问题的根本原因。

JobManager exits multiple thread pools in parallel before JobManager process 
exiting. Different thread pool exit orders result in different threads 
executing code1-1. If the thread pool is daemon pool (for example: ioExecutor) 
and finally exits, code1-1 will not be executed. This is the root cause of the 
problem.

如果将ioExecutor对应的类的 
org.apache.flink.runtime.util.ExecutorThreadFactory#newThread 方法从code7 修改成 
code8,这么做以后,即使我们在code5中等待10秒,code1-1也会得到执行。此时执行code1-1的线程将是 
ForkJoinPool.commonPool-worker-57

If the org.apache.flink.runtime.util.ExecutorThreadFactory#newThread method of 
the class corresponding to ioExecutor is modified from code7 to code8, after 
doing so, even if we wait 10 seconds in code5, code1-1 will be executed. At 
this time, the thread executing code1-1 will be 
ForkJoinPool.commonPool-worker-57

// code7
{code:java}
@Override
 public Thread newThread(Runnable runnable) {
 Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
 t.setDaemon(true);

 t.setPriority(threadPriority);

 // optional handler for uncaught exceptions
 if (exceptionHandler != null) {
   t.setUncaughtExceptionHandler(exceptionHandler);
 }

 return t;
 }{code}
// code8
{code:java}
@Override
 public Thread newThread(Runnable runnable) {
 Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
 t.setDaemon(false);

 t.setPriority(threadPriority);

 // optional handler for uncaught exceptions
 if (exceptionHandler != null) {
   t.setUncaughtExceptionHandler(exceptionHandler);
 }
  return t;
}{code}
 

What do you think?

 

 


was (Author: wjc920):
我任务我的问题和ha无关,因为我提交的任务是Flink官方提供的Example中的WordCount,并没有开启HA。

My flink job and my problem have nothing to do with ha, because the task I 
submitted is the WordCount in the Example provided by Flink, and HA is not 
turned on.

我想尝试进一步的解释:

I want to try further explanation:

我在 org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
中添加了一些打印线程信息的日志,用来获取JobManager退出前的状态,代码修改如下:

I added some logs that print thread information to 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint to get the status before 
the JobManager exits. The code is modified as follows:

// code1
{code:java}
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

 final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
 try {
   clusterEntrypoint.startCluster();
 } catch (ClusterEntrypointException e) {
   LOG.error(String.format("Could not start cluster entrypoint %s.", 
clusterEntrypointName), e);
   System.exit(STARTUP_FAILURE_RETURN_CODE);
 }
// code1-1
 clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
   final int returnCode;

   LOG.info("================clusterEntrypoint.getTerminationFuture 
isTerminal======================================");
   LOG.info("===Current thread name is: " + Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
 printThreads();

   if (throwable != null) {
     returnCode = RUNTIME_FAILURE_RETURN_CODE;
   } else {
     returnCode = applicationStatus.processExitCode();
   }

   LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
clusterEntrypointName, returnCode, throwable);
   System.exit(returnCode);
 });
 }{code}
// code2
 private void cleanupDirectories() throws IOException

{ LOG.info("===================Starting 
cleanupDirectories================================"); 
LOG.info("cleanupDirectories===Current thread name is: " + 
Thread.currentThread().getName() + ", isDaemon: " + 
Thread.currentThread().isDaemon()); printThreads("cleanupDirectories"); 
ShutdownHookUtil.removeShutdownHook(shutDownHook, getClass().getSimpleName(), 
LOG); final String webTmpDir = configuration.getString(WebOptions.TMP_DIR); 
FileUtils.deleteDirectory(new File(webTmpDir)); }

// code3
{code:java}
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 final long shutdownTimeout = 
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

 synchronized (lock) {
 Throwable exception = null;

 final Collection<CompletableFuture<Void>> terminationFutures = new 
ArrayList<>(3);

 if (blobServer != null) {
   try {
     blobServer.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (haServices != null) {
   try {
     if (cleanupHaData) {
       haServices.closeAndCleanupAllData();
     } else {
       haServices.close();
     }
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   } 
 }

 if (archivedExecutionGraphStore != null) {
   try {
     archivedExecutionGraphStore.close();
   } catch (Throwable t) {
     exception = ExceptionUtils.firstOrSuppressed(t, exception);
   }
 }

 if (processMetricGroup != null) {
   processMetricGroup.close();
 }

 if (metricRegistry != null) {
   LOG.info("===metricRegistry is not null");
   CompletableFuture<Void> futureMetricRegistry = metricRegistry.shutdown();
   terminationFutures.add(futureMetricRegistry);
   futureMetricRegistry.whenComplete((aVoid, throwable) -> {
     // code3-1
     LOG.info("========================metricRegistry shutdowns 
successfully===================================");
     LOG.info("metricRegistry===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("metricRegistry");
   });
 }

 if (ioExecutor != null) {
   LOG.info("===ioExecutor is not null");
   CompletableFuture<Void> futureIoExecutor = 
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
   TimeUnit.MILLISECONDS, ioExecutor);
   terminationFutures.add(futureIoExecutor);
   futureIoExecutor.whenComplete((aVoid, throwable) -> {
     // code3-2
     LOG.info("==============ioExecutor shutdowns 
successfully==========================");
     LOG.info("ioExecutor===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("ioExecutor");
   });
 }

 if (commonRpcService != null) {
   LOG.info("===commonRpcService is not null");
   CompletableFuture<Void> futureCommonRpcService = 
commonRpcService.stopService();
   terminationFutures.add(futureCommonRpcService);
   futureCommonRpcService.whenComplete((aVoid, throwable) -> {
     // code3-3
     LOG.info("============================commonRpcService shutdowns 
successfully=====================================");
     LOG.info("commonRpcService===Current thread name is: " + 
Thread.currentThread().getName()
 + ", isDaemon: " + Thread.currentThread().isDaemon());
     printThreads("commonRpcService");
   });

 }

 if (exception != null) {
   terminationFutures.add(FutureUtils.completedExceptionally(exception));
 }

 return FutureUtils.completeAll(terminationFutures);
 }{code}
// code4
{code:java}
public static void printThreads(String name) {
 ThreadGroup group = Thread.currentThread().getThreadGroup();
 ThreadGroup topGroup = group;
 while (group != null) {
 topGroup = group;
 group = group.getParent();
 }
 int slackSize = topGroup.activeCount() * 2;
 Thread[] slackThreads = new Thread[slackSize];
 int actualSize = topGroup.enumerate(slackThreads);
 Thread[] atualThreads = new Thread[actualSize];
 System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
 LOG.info(name+"===Threads size is " + atualThreads.length);
 for (Thread thread : atualThreads) {
 LOG.info(name+"===Thread name : " + thread.getName()+", isDaemon: " + 
thread.isDaemon());
 }

 }{code}
 

 

日志文件解释:
 The attached log file description:

 

Add wait 5 seconds in 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log                 

这个日志文件对应code6,在 org.apache.flink.runtime.history.FsJobArchivist#archiveJob 
方法中加入了5秒等待的逻辑

This log file corresponds to code6, and a 5-second wait logic is added to the 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob method

 

Not add wait 5 seconds.log

这个日志文件对应code6, org.apache.flink.runtime.history.FsJobArchivist#archiveJob 
方法中删除5秒等待的逻辑
 This log file corresponds to code6, delete the logic to wait for 5 seconds in 
the org.apache.flink.runtime.history.FsJobArchivist#archiveJob method

// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
 throws IOException {
 try {
   FileSystem fs = rootPath.getFileSystem();
   Path path = new Path(rootPath, jobId.toString());
   OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

   try {
     LOG.info("===========================Wait 5 seconds..");
     Thread.sleep(5000);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }

   try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
     ... // Part of the code is omitted here
   } catch (Exception e) {
 fs.delete(path, false);
 throw e;
 }
 LOG.info("Job {} has been archived at {}.", jobId, path);
 return path;
 } catch (IOException e) {
 LOG.error("Failed to archive job.", e);
 throw e;
 }
}{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
   throws IOException {
   try {
     FileSystem fs = rootPath.getFileSystem();
     Path path = new Path(rootPath, jobId.toString());
     OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
 
     // There is no logic to wait for 5 seconds here
     try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
       ... // Part of the code is omitted here
     } catch (Exception e) {
       fs.delete(path, false);
       throw e;
     }
     LOG.info("Job {} has been archived at {}.", jobId, path);
     return path;
   } catch (IOException e) {
     LOG.error("Failed to archive job.", e);
     throw e;
   }
}{code}
我认为,code1-1 和 code2 中打印的线程信息就是JobManager进程退出前的线程信息。

I think that the thread information printed in code1-1 and code2 is the thread 
information before the JobManager process exits.

如果加入5秒等待,code3-2将不会被执行,也就是说ioExecutor未能正常退出。

If add a 5-second wait, code3-2 and code1-1 will not be executed, which means 
that ioExecutor fails to exit normally.

code2 被 YarnJobClusterEntrypoint shutdown hook 执行。

and code2 is executed by the YarnJobClusterEntrypoint shutdown hook.

如果删除5秒等待,此时我们发现程序正常退出,code1-1 被 flink-akka.actor.default-dispatcher-16 线程执行。 
与此对应的还有一种情况,如果metricRegistry 比 commonRpcService后退出,执行 code1-1 的将是 
flink-metrics-scheduler-1。

If you delete the 5-second wait, we find that the program exits normally, and 
code1-1 is executed by the flink-akka.actor.default-dispatcher-16 thread. 
Corresponding to this, there is another situation. If metricRegistry exits 
after commonRpcService, code1-1 will be executed by flink-metrics-scheduler-1.

JobManager在退出前会并行退出多个线程池,不同的线程池退出顺序导致,执行 code1-1 
的线程不同。如果守护线程池(例如:ioExecutor),最后退出,code1-1将不会被执行。这就是问题的根本原因。

JobManager exits multiple thread pools in parallel before JobManager process 
exiting. Different thread pool exit orders result in different threads 
executing code1-1. If the thread pool is daemon pool (for example: ioExecutor) 
and finally exits, code1-1 will not be executed. This is the root cause of the 
problem.

如果将ioExecutor对应的类的 
org.apache.flink.runtime.util.ExecutorThreadFactory#newThread 方法从code7 修改成 
code8,这么做以后,即使我们在code5中等待10秒,code1-1也会得到执行。此时执行code1-1的线程将是 
ForkJoinPool.commonPool-worker-57

If the org.apache.flink.runtime.util.ExecutorThreadFactory#newThread method of 
the class corresponding to ioExecutor is modified from code7 to code8, after 
doing so, even if we wait 10 seconds in code5, code1-1 will be executed. At 
this time, the thread executing code1-1 will be 
ForkJoinPool.commonPool-worker-57

// code7
{code:java}
@Override
 public Thread newThread(Runnable runnable) {
 Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
 t.setDaemon(true);

 t.setPriority(threadPriority);

 // optional handler for uncaught exceptions
 if (exceptionHandler != null) {
   t.setUncaughtExceptionHandler(exceptionHandler);
 }

 return t;
 }{code}
// code8
{code:java}
@Override
 public Thread newThread(Runnable runnable) {
 Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
 t.setDaemon(false);

 t.setPriority(threadPriority);

 // optional handler for uncaught exceptions
 if (exceptionHandler != null) {
   t.setUncaughtExceptionHandler(exceptionHandler);
 }
  return t;
}{code}
 

What do you think?

 

 

> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job 
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21274
>                 URL: https://issues.apache.org/jira/browse/FLINK-21274
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.1
>            Reporter: Jichao Wang
>            Priority: Critical
>         Attachments: 1.png, 2.png, 
> application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml), 
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster 
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a 
> WorkCount task to the Yarn cluster. Under normal circumstances, after the 
> task is completed, the flink job execution information will be archived to 
> HDFS, and then the JobManager process will exit. However, when this archiving 
> process takes a long time (maybe the HDFS write speed is slow), the task 
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the 
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
>  method to wait 5 seconds before actually writing to HDFS (simulating a slow 
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive) 
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding 
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment 
> application_1612404624605_0010-JobManager.log for log details), and found 
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before 
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above 
> situation occurs, indicating that the JobManager is running to a certain 
> point, and there is no user thread in the JobManager process, which causes 
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor, 
> metricRegistry, commonRpcService) were exited asynchronously in 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
>  and multiple services would be exited in the shutdown() method of 
> metricRegistry (for example : executor), these exit actions are executed 
> asynchronously and in parallel. If ioExecutor or executor exits last, it will 
> cause the above problems.The root cause is that the threads in these two 
> Executors are daemon threads, while the threads in Akka Actor are user 
> threads.
> I hope to modify the following code to fix this bug. If it is determined that 
> this is a bug (this problem will affect all versions above 1.9), please 
> assign the ticket to me, thank you.
> Only need to modify the 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
>  method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    int returnCode;
>    Throwable throwable = null;
>    try {
>       returnCode = 
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
>    } catch (Throwable e) {
>       throwable = e;
>       returnCode = RUNTIME_FAILURE_RETURN_CODE;
>    }
>    LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
> clusterEntrypointName, returnCode, throwable);
>    System.exit(returnCode);
> }{code}
>  Before fixing: 
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
> throwable) -> {
>       final int returnCode;
>       if (throwable != null) {
>          returnCode = RUNTIME_FAILURE_RETURN_CODE;
>       } else {
>          returnCode = applicationStatus.processExitCode();
>       }
>       LOG.info("Terminating cluster entrypoint process {} with exit code 
> {}.", clusterEntrypointName, returnCode, throwable);
>       System.exit(returnCode);
>    });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to