[ 
https://issues.apache.org/jira/browse/FLINK-21274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279454#comment-17279454
 ] 

Jichao Wang edited comment on FLINK-21274 at 2/6/21, 7:37 AM:
--------------------------------------------------------------

I want to try further explanation:

I added some logs that print thread information to 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint{color} to 
get the status before the JobManager exits. The code is modified as follows:

// code1
{code:java}
        public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {

                final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
                try {
                        clusterEntrypoint.startCluster();
                } catch (ClusterEntrypointException e) {
                        LOG.error(String.format("Could not start cluster 
entrypoint %s.", clusterEntrypointName), e);
                        System.exit(STARTUP_FAILURE_RETURN_CODE);
                }

                // code1-1
                
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
                        final int returnCode;

                        
LOG.info("================clusterEntrypoint.getTerminationFuture 
isTerminal======================================");
                        LOG.info("===Current thread name is: " + 
Thread.currentThread().getName()
                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                        printThreads();

                        if (throwable != null) {
                                returnCode = RUNTIME_FAILURE_RETURN_CODE;
                        } else {
                                returnCode = 
applicationStatus.processExitCode();
                        }

                        LOG.info("Terminating cluster entrypoint process {} 
with exit code {}.", clusterEntrypointName, returnCode, throwable);
                        System.exit(returnCode);
                });
        }
{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
                LOG.info("===================Starting 
cleanupDirectories================================");
                LOG.info("cleanupDirectories===Current thread name is: " + 
Thread.currentThread().getName()
                        + ", isDaemon: " + Thread.currentThread().isDaemon());
                printThreads();
                ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);

                final String webTmpDir = 
configuration.getString(WebOptions.TMP_DIR);

                FileUtils.deleteDirectory(new File(webTmpDir));
        }
{code}
 

// code3
{code:java}
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                final long shutdownTimeout = 
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

                synchronized (lock) {
                        Throwable exception = null;

                        final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(3);

                        if (blobServer != null) {
                                try {
                                        blobServer.close();
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }

                        if (haServices != null) {
                                try {
                                        if (cleanupHaData) {
                                                
haServices.closeAndCleanupAllData();
                                        } else {
                                                haServices.close();
                                        }
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }

                        if (archivedExecutionGraphStore != null) {
                                try {
                                        archivedExecutionGraphStore.close();
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }

                        if (processMetricGroup != null) {
                                processMetricGroup.close();
                        }

                        if (metricRegistry != null) {
                                LOG.info("===metricRegistry is not null");
                                CompletableFuture<Void> futureMetricRegistry = 
metricRegistry.shutdown();
                                terminationFutures.add(futureMetricRegistry);
                                futureMetricRegistry.whenComplete((aVoid, 
throwable) -> {
                                    // code3-1
                                        
LOG.info("========================metricRegistry shutdowns 
successfully===================================");
                                        LOG.info("metricRegistry===Current 
thread name is: " + Thread.currentThread().getName()
                                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                                        printThreads();
                                });
                        }

                        if (ioExecutor != null) {
                                LOG.info("===ioExecutor is not null");
                                CompletableFuture<Void> futureIoExecutor = 
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
                                        TimeUnit.MILLISECONDS, ioExecutor);
                                terminationFutures.add(futureIoExecutor);
                                futureIoExecutor.whenComplete((aVoid, 
throwable) -> {
                                    // code3-2
                                        LOG.info("==============ioExecutor 
shutdowns successfully==========================");
                                        LOG.info("ioExecutor===Current thread 
name is: " + Thread.currentThread().getName()
                                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                                        printThreads();
                                });
                        }

                        if (commonRpcService != null) {
                                LOG.info("===commonRpcService is not null");
                                CompletableFuture<Void> futureCommonRpcService 
= commonRpcService.stopService();
                                terminationFutures.add(futureCommonRpcService);
                                futureCommonRpcService.whenComplete((aVoid, 
throwable) -> {
                                    // code3-3
                                        
LOG.info("============================commonRpcService shutdowns 
successfully=====================================");
                                        LOG.info("commonRpcService===Current 
thread name is: " + Thread.currentThread().getName()
                                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                                        printThreads();
                                });

                        }

                        if (exception != null) {
                                
terminationFutures.add(FutureUtils.completedExceptionally(exception));
                        }

                        return FutureUtils.completeAll(terminationFutures);
                }
{code}
// code4
{code:java}
        public static void printThreads() {
                ThreadGroup group = Thread.currentThread().getThreadGroup();
                ThreadGroup topGroup = group;
                while (group != null) {
                        topGroup = group;
                        group = group.getParent();
                }
                int slackSize = topGroup.activeCount() * 2;
                Thread[] slackThreads = new Thread[slackSize];
                int actualSize = topGroup.enumerate(slackThreads);
                Thread[] atualThreads = new Thread[actualSize];
                System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
                LOG.info("===Threads size is " + atualThreads.length);
                for (Thread thread : atualThreads) {
                        LOG.info("===Thread name : " + thread.getName()+", 
isDaemon: " + thread.isDaemon());
                }

        }
{code}
 
 The attached log file description:

 

[^Add wait 5 seconds in 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log]This log file 
corresponds to {color:#0747a6}code6{color}, and a 5-second wait logic is added 
to the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

 

[^Not add wait 5 seconds.log]This log file corresponds to 
{color:#0747a6}code6{color}, delete the logic to wait for 5 seconds in the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
                
                // Here, will not wait 5 seconds

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
I think that the thread information printed in {color:#0747a6}code1-1{color} 
and {color:#0747a6}code2{color} is the thread information before the JobManager 
process exits.

If add a 5-second wait, {color:#0747a6}code3-2{color} and{color:#0747a6} 
code1-1{color} will not be executed, which means that ioExecutor fails to exit 
normally. And {color:#0747a6}code2{color} is executed by the 
YarnJobClusterEntrypoint shutdown hook. Why does this happen? I think we need 
to think about it carefully.

If you delete the 5-second wait, we find that the program exits normally, and 
{color:#0747a6}code1-1{color} is executed by the 
{color:#0747a6}flink-akka.actor.default-dispatcher-16{color} thread. 
Corresponding to this, there is another situation. If metricRegistry exits 
after commonRpcService,{color:#0747a6} code1-1{color} will be executed by 
{color:#0747a6}flink-metrics-scheduler-1{color}.

JobManager exits multiple thread pools in parallel before JobManager process 
exiting. Different thread pool exit orders result in different threads 
executing {color:#0747a6}code1-1{color}. If the thread pool is daemon pool (for 
example: ioExecutor) and finally exits, {color:#0747a6}code1-1{color} will not 
be executed. This is the root cause of the problem.

If the 
{color:#0747a6}org.apache.flink.runtime.util.ExecutorThreadFactory#newThread{color}
 method corresponding to ioExecutor is modified from 
{color:#0747a6}code7{color} to {color:#0747a6}code8{color}, after doing so, 
even if we wait 10 seconds in {color:#0747a6}code5{color}, 
{color:#0747a6}code1-1{color} will be executed. At this time, the thread 
executing code1-1 will be 
{color:#0747a6}ForkJoinPool.commonPool-worker-57{color}

// code7
{code:java}
        @Override
        public Thread newThread(Runnable runnable) {
                Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
                t.setDaemon(true);

                t.setPriority(threadPriority);

                // optional handler for uncaught exceptions
                if (exceptionHandler != null) {
                        t.setUncaughtExceptionHandler(exceptionHandler);
                }

                return t;
        }
{code}
// code8
{code:java}
        @Override
        public Thread newThread(Runnable runnable) {
                Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
                t.setDaemon(false);

                t.setPriority(threadPriority);

                // optional handler for uncaught exceptions
                if (exceptionHandler != null) {
                        t.setUncaughtExceptionHandler(exceptionHandler);
                }

                return t;
        }
{code}
I think {color:#0747a6}code1-1{color} needs to be executed by MainThread, 
otherwise if the daemon thread pool finally exits, 
{color:#0747a6}code1-1{color} will not be executed.

[~fly_in_gis] What do you think?


was (Author: wjc920):
I want to try further explanation:

I added some logs that print thread information to 
{color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint{color} to 
get the status before the JobManager exits. The code is modified as follows:

// code1
{code:java}
        public static void runClusterEntrypoint(ClusterEntrypoint 
clusterEntrypoint) {

                final String clusterEntrypointName = 
clusterEntrypoint.getClass().getSimpleName();
                try {
                        clusterEntrypoint.startCluster();
                } catch (ClusterEntrypointException e) {
                        LOG.error(String.format("Could not start cluster 
entrypoint %s.", clusterEntrypointName), e);
                        System.exit(STARTUP_FAILURE_RETURN_CODE);
                }

                // code1-1
                
clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
throwable) -> {
                        final int returnCode;

                        
LOG.info("================clusterEntrypoint.getTerminationFuture 
isTerminal======================================");
                        LOG.info("===Current thread name is: " + 
Thread.currentThread().getName()
                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                        printThreads();

                        if (throwable != null) {
                                returnCode = RUNTIME_FAILURE_RETURN_CODE;
                        } else {
                                returnCode = 
applicationStatus.processExitCode();
                        }

                        LOG.info("Terminating cluster entrypoint process {} 
with exit code {}.", clusterEntrypointName, returnCode, throwable);
                        System.exit(returnCode);
                });
        }
{code}
// code2
{code:java}
private void cleanupDirectories() throws IOException {
                LOG.info("===================Starting 
cleanupDirectories================================");
                LOG.info("cleanupDirectories===Current thread name is: " + 
Thread.currentThread().getName()
                        + ", isDaemon: " + Thread.currentThread().isDaemon());
                printThreads("cleanupDirectories");
                ShutdownHookUtil.removeShutdownHook(shutDownHook, 
getClass().getSimpleName(), LOG);

                final String webTmpDir = 
configuration.getString(WebOptions.TMP_DIR);

                FileUtils.deleteDirectory(new File(webTmpDir));
        }
{code}
 

// code3
{code:java}
        protected CompletableFuture<Void> stopClusterServices(boolean 
cleanupHaData) {
                final long shutdownTimeout = 
configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

                synchronized (lock) {
                        Throwable exception = null;

                        final Collection<CompletableFuture<Void>> 
terminationFutures = new ArrayList<>(3);

                        if (blobServer != null) {
                                try {
                                        blobServer.close();
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }

                        if (haServices != null) {
                                try {
                                        if (cleanupHaData) {
                                                
haServices.closeAndCleanupAllData();
                                        } else {
                                                haServices.close();
                                        }
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }

                        if (archivedExecutionGraphStore != null) {
                                try {
                                        archivedExecutionGraphStore.close();
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }
                        }

                        if (processMetricGroup != null) {
                                processMetricGroup.close();
                        }

                        if (metricRegistry != null) {
                                LOG.info("===metricRegistry is not null");
                                CompletableFuture<Void> futureMetricRegistry = 
metricRegistry.shutdown();
                                terminationFutures.add(futureMetricRegistry);
                                futureMetricRegistry.whenComplete((aVoid, 
throwable) -> {
                                    // code3-1
                                        
LOG.info("========================metricRegistry shutdowns 
successfully===================================");
                                        LOG.info("metricRegistry===Current 
thread name is: " + Thread.currentThread().getName()
                                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                                        printThreads("metricRegistry");
                                });
                        }

                        if (ioExecutor != null) {
                                LOG.info("===ioExecutor is not null");
                                CompletableFuture<Void> futureIoExecutor = 
ExecutorUtils.nonBlockingShutdown(shutdownTimeout,
                                        TimeUnit.MILLISECONDS, ioExecutor);
                                terminationFutures.add(futureIoExecutor);
                                futureIoExecutor.whenComplete((aVoid, 
throwable) -> {
                                    // code3-2
                                        LOG.info("==============ioExecutor 
shutdowns successfully==========================");
                                        LOG.info("ioExecutor===Current thread 
name is: " + Thread.currentThread().getName()
                                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                                        printThreads("ioExecutor");
                                });
                        }

                        if (commonRpcService != null) {
                                LOG.info("===commonRpcService is not null");
                                CompletableFuture<Void> futureCommonRpcService 
= commonRpcService.stopService();
                                terminationFutures.add(futureCommonRpcService);
                                futureCommonRpcService.whenComplete((aVoid, 
throwable) -> {
                                    // code3-3
                                        
LOG.info("============================commonRpcService shutdowns 
successfully=====================================");
                                        LOG.info("commonRpcService===Current 
thread name is: " + Thread.currentThread().getName()
                                                + ", isDaemon: " + 
Thread.currentThread().isDaemon());
                                        printThreads("commonRpcService");
                                });

                        }

                        if (exception != null) {
                                
terminationFutures.add(FutureUtils.completedExceptionally(exception));
                        }

                        return FutureUtils.completeAll(terminationFutures);
                }
{code}

// code4

{code:java}
        public static void printThreads() {
                ThreadGroup group = Thread.currentThread().getThreadGroup();
                ThreadGroup topGroup = group;
                while (group != null) {
                        topGroup = group;
                        group = group.getParent();
                }
                int slackSize = topGroup.activeCount() * 2;
                Thread[] slackThreads = new Thread[slackSize];
                int actualSize = topGroup.enumerate(slackThreads);
                Thread[] atualThreads = new Thread[actualSize];
                System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
                LOG.info("===Threads size is " + atualThreads.length);
                for (Thread thread : atualThreads) {
                        LOG.info("===Thread name : " + thread.getName()+", 
isDaemon: " + thread.isDaemon());
                }

        }
{code}

 
 The attached log file description:

 

[^Add wait 5 seconds in 
org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log]This log file 
corresponds to {color:#0747a6}code6{color}, and a 5-second wait logic is added 
to the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

 

[^Not add wait 5 seconds.log]This log file corresponds to 
{color:#0747a6}code6{color}, delete the logic to wait for 5 seconds in the 
{color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
 method

// code5
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

        try {
            LOG.info("===========================Wait 5 seconds..");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
// code6
{code:java}
public static Path archiveJob(Path rootPath, JobID jobId, 
Collection<ArchivedJson> jsonToArchive) 
    throws IOException {
    try {
        FileSystem fs = rootPath.getFileSystem();
        Path path = new Path(rootPath, jobId.toString());
        OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
                
                // Here, will not wait 5 seconds

        try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
JsonEncoding.UTF8)) {
            ...  // Part of the code is omitted here
        } catch (Exception e) {
            fs.delete(path, false);
            throw e;
        }
        LOG.info("Job {} has been archived at {}.", jobId, path);
        return path;
    } catch (IOException e) {
        LOG.error("Failed to archive job.", e);
        throw e;
    }
}
{code}
I think that the thread information printed in {color:#0747a6}code1-1{color} 
and {color:#0747a6}code2{color} is the thread information before the JobManager 
process exits.

If add a 5-second wait, {color:#0747a6}code3-2{color} and{color:#0747a6} 
code1-1{color} will not be executed, which means that ioExecutor fails to exit 
normally. And {color:#0747a6}code2{color} is executed by the 
YarnJobClusterEntrypoint shutdown hook. Why does this happen? I think we need 
to think about it carefully.

If you delete the 5-second wait, we find that the program exits normally, and 
{color:#0747a6}code1-1{color} is executed by the 
{color:#0747a6}flink-akka.actor.default-dispatcher-16{color} thread. 
Corresponding to this, there is another situation. If metricRegistry exits 
after commonRpcService,{color:#0747a6} code1-1{color} will be executed by 
{color:#0747a6}flink-metrics-scheduler-1{color}.

JobManager exits multiple thread pools in parallel before JobManager process 
exiting. Different thread pool exit orders result in different threads 
executing {color:#0747a6}code1-1{color}. If the thread pool is daemon pool (for 
example: ioExecutor) and finally exits, {color:#0747a6}code1-1{color} will not 
be executed. This is the root cause of the problem.

If the 
{color:#0747a6}org.apache.flink.runtime.util.ExecutorThreadFactory#newThread{color}
 method corresponding to ioExecutor is modified from 
{color:#0747a6}code7{color} to {color:#0747a6}code8{color}, after doing so, 
even if we wait 10 seconds in {color:#0747a6}code5{color}, 
{color:#0747a6}code1-1{color} will be executed. At this time, the thread 
executing code1-1 will be 
{color:#0747a6}ForkJoinPool.commonPool-worker-57{color}

// code7
{code:java}
        @Override
        public Thread newThread(Runnable runnable) {
                Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
                t.setDaemon(true);

                t.setPriority(threadPriority);

                // optional handler for uncaught exceptions
                if (exceptionHandler != null) {
                        t.setUncaughtExceptionHandler(exceptionHandler);
                }

                return t;
        }
{code}

// code8
{code:java}
        @Override
        public Thread newThread(Runnable runnable) {
                Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
                t.setDaemon(false);

                t.setPriority(threadPriority);

                // optional handler for uncaught exceptions
                if (exceptionHandler != null) {
                        t.setUncaughtExceptionHandler(exceptionHandler);
                }

                return t;
        }
{code}
I think {color:#0747a6}code1-1{color} needs to be executed by MainThread, 
otherwise if the daemon thread pool finally exits, 
{color:#0747a6}code1-1{color} will not be executed.

[~fly_in_gis] What do you think?

> At per-job mode,if the HDFS write is slow(about 5 seconds), the flink job 
> archive file will upload fails
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21274
>                 URL: https://issues.apache.org/jira/browse/FLINK-21274
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.1
>            Reporter: Jichao Wang
>            Priority: Critical
>         Attachments: 1.png, 2.png, Add wait 5 seconds in 
> org.apache.flink.runtime.history.FsJobArchivist#archiveJob.log, Not add wait 
> 5 seconds.log, application_1612404624605_0010-JobManager.log
>
>
> This is a partial configuration of my Flink History service(flink-conf.yaml), 
> and this is also the configuration of my Flink client.
> {code:java}
> jobmanager.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> historyserver.archive.fs.dir: hdfs://hdfsHACluster/flink/completed-jobs/
> {code}
> I used {color:#0747a6}flink run -m yarn-cluster 
> /cloud/service/flink/examples/batch/WordCount.jar{color} to submit a 
> WorkCount task to the Yarn cluster. Under normal circumstances, after the 
> task is completed, the flink job execution information will be archived to 
> HDFS, and then the JobManager process will exit. However, when this archiving 
> process takes a long time (maybe the HDFS write speed is slow), the task 
> archive file upload fails.
> The specific reproduction method is as follows:
> Modify the 
> {color:#0747a6}org.apache.flink.runtime.history.FsJobArchivist#archiveJob{color}
>  method to wait 5 seconds before actually writing to HDFS (simulating a slow 
> write speed scenario).
> {code:java}
> public static Path archiveJob(Path rootPath, JobID jobId, 
> Collection<ArchivedJson> jsonToArchive) 
>     throws IOException {
>     try {
>         FileSystem fs = rootPath.getFileSystem();
>         Path path = new Path(rootPath, jobId.toString());
>         OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
>         try {
>             LOG.info("===========================Wait 5 seconds..");
>             Thread.sleep(5000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>         try (JsonGenerator gen = jacksonFactory.createGenerator(out, 
> JsonEncoding.UTF8)) {
>             ...  // Part of the code is omitted here
>         } catch (Exception e) {
>             fs.delete(path, false);
>             throw e;
>         }
>         LOG.info("Job {} has been archived at {}.", jobId, path);
>         return path;
>     } catch (IOException e) {
>         LOG.error("Failed to archive job.", e);
>         throw e;
>     }
> }
> {code}
> After I make the above changes to the code, I cannot find the corresponding 
> task on Flink's HistoryServer(Refer to Figure 1.png and Figure 2.png).
> Then I went to Yarn to browse the JobManager log (see attachment 
> application_1612404624605_0010-JobManager.log for log details), and found 
> that the following logs are missing in the task log:
> {code:java}
> INFO entrypoint.ClusterEntrypoint: Terminating cluster entrypoint process 
> YarnJobClusterEntrypoint with exit code 0.{code}
> Usually, if the task exits normally, a similar log will be printed before 
> executing {color:#0747a6}System.exit(returnCode){color}.
> If no Exception information is found in the JobManager log, the above 
> situation occurs, indicating that the JobManager is running to a certain 
> point, and there is no user thread in the JobManager process, which causes 
> the program to exit without completing the normal process.
> Eventually I found out that multiple services (for example: ioExecutor, 
> metricRegistry, commonRpcService) were exited asynchronously in 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#stopClusterServices{color},
>  and multiple services would be exited in the shutdown() method of 
> metricRegistry (for example : executor), these exit actions are executed 
> asynchronously and in parallel. If ioExecutor or executor exits last, it will 
> cause the above problems.The root cause is that the threads in these two 
> Executors are daemon threads, while the threads in Akka Actor are user 
> threads.
> I hope to modify the following code to fix this bug. If it is determined that 
> this is a bug (this problem will affect all versions above 1.9), please 
> assign the ticket to me, thank you.
> Only need to modify the 
> {color:#0747a6}org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runClusterEntrypoint{color}
>  method:
> After fixing:
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    int returnCode;
>    Throwable throwable = null;
>    try {
>       returnCode = 
> clusterEntrypoint.getTerminationFuture().get().processExitCode();
>    } catch (Throwable e) {
>       throwable = e;
>       returnCode = RUNTIME_FAILURE_RETURN_CODE;
>    }
>    LOG.info("Terminating cluster entrypoint process {} with exit code {}.", 
> clusterEntrypointName, returnCode, throwable);
>    System.exit(returnCode);
> }{code}
>  Before fixing: 
> {code:java}
> public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
>    final String clusterEntrypointName = 
> clusterEntrypoint.getClass().getSimpleName();
>    try {
>       clusterEntrypoint.startCluster();
>    } catch (ClusterEntrypointException e) {
>       LOG.error(String.format("Could not start cluster entrypoint %s.", 
> clusterEntrypointName), e);
>       System.exit(STARTUP_FAILURE_RETURN_CODE);
>    }
>    clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, 
> throwable) -> {
>       final int returnCode;
>       if (throwable != null) {
>          returnCode = RUNTIME_FAILURE_RETURN_CODE;
>       } else {
>          returnCode = applicationStatus.processExitCode();
>       }
>       LOG.info("Terminating cluster entrypoint process {} with exit code 
> {}.", clusterEntrypointName, returnCode, throwable);
>       System.exit(returnCode);
>    });
> }
> {code}
> The purpose of the modification is to ensure that the Main thread exits last.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to