[ 
https://issues.apache.org/jira/browse/FLINK-21768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Junfan Zhang updated FLINK-21768:
---------------------------------
    Description: 
h2. Why 
We encounter a problem when Oozie integerated with Flink Batch Action. 
Oozie will use a launcher job to start Flink client used to submit Flink job to 
Hadoop Yarn. 
And when Flink client finished , Oozie will get its exitCode to determine job 
submission status and then do some extra things.

So how Oozie catch {{System.exit()}}? It will implement JDK SecurityManager. 
([Oozie related code 
link|https://github.com/apache/oozie/blob/f1e01a9e155692aa5632f4573ab1b3ebeab7ef45/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/security/LauncherSecurityManager.java#L24]).
 

Now when Flink Client finished successfully, it will call 
{{System.exit(0)}}([Flink related code 
link|https://github.com/apache/flink/blob/195298aea327b3f98d9852121f0f146368696300/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1133])
 method. 
And then JVM will use LauncherSecurityManager(Oozie implemented) to handle 
{{System.exit(0)}} method and trigger {{LauncherSecurityManager.checkExit()}} 
method, and then will throw exception. 
Finally Flink Client will catch its {{throwable}} and call 
{{System.exit(31)}}([related code 
link|https://github.com/apache/flink/blob/195298aea327b3f98d9852121f0f146368696300/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1139])
 method again. It will cause Oozie to misjudge the status of the Fllink job.

Actually it's a corner case. In most scenes, the situation I mentioned will not 
happen. But it's still necessary for us to optimize client exit logic. 

Besides, i think the problem above may also exist in some other frameworks such 
as linkedin/azakaban and apache/airflow, which are using Flink client to submit 
batch job.


Flink related code:

{code:java}
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", 
args);

        // 1. find the configuration directory
        final String configurationDirectory = 
getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration =
                GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines =
                loadCustomCommandLines(configuration, configurationDirectory);

        try {
            final CliFrontend cli = new CliFrontend(configuration, 
customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            int retCode =
                    SecurityUtils.getInstalledContext().runSecured(() -> 
cli.parseAndRun(args));
            System.exit(retCode);
        } catch (Throwable t) {
            final Throwable strippedThrowable =
                    ExceptionUtils.stripException(t, 
UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", 
strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(31);
        }
    }
{code}


  was:
h2. Why 
We encounter a problem when Oozie integerated with Flink Batch Action. 
Oozie will use a launcher job to start Flink client used to submit Flink job to 
Hadoop Yarn. 
And when Flink client finished , Oozie will get its exitCode to determine job 
submission status and then do some extra things.

So how Oozie catch {{System.exit()}}? It will implement JDK SecurityManager. 
([Oozie related code 
link|https://github.com/apache/oozie/blob/f1e01a9e155692aa5632f4573ab1b3ebeab7ef45/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/security/LauncherSecurityManager.java#L24]).
 

Now when Flink Client finished successfully, it will call 
{{System.exit(0)}}([Flink related code 
link|https://github.com/apache/flink/blob/195298aea327b3f98d9852121f0f146368696300/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1133])
 method. 
And then JVM will use LauncherSecurityManager(Oozie implemented) to handle 
{{System.exit(0)}} method and trigger {{LauncherSecurityManager.checkExit()}} 
method, and then will throw exception. 
Finally Flink Client will catch its {{throwable}} and call 
{{System.exit(31)}}([related code 
link|https://github.com/apache/flink/blob/195298aea327b3f98d9852121f0f146368696300/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1139])
 method again. It will cause Oozie to misjudge the status of the Fllink job.

Actually it's a corner case. In most scenes, the situation I mentioned will not 
happen. But it's still necessary for us to optimize client exit logic. 

Besides, i think the problem above may also exist in some other frameworks such 
as linkedin/azakaban and apache/airflow, which are using Flink client to submit 
batch job.



> Optimize system.exit() logic of CliFrontend
> -------------------------------------------
>
>                 Key: FLINK-21768
>                 URL: https://issues.apache.org/jira/browse/FLINK-21768
>             Project: Flink
>          Issue Type: Improvement
>          Components: Command Line Client
>            Reporter: Junfan Zhang
>            Priority: Major
>
> h2. Why 
> We encounter a problem when Oozie integerated with Flink Batch Action. 
> Oozie will use a launcher job to start Flink client used to submit Flink job 
> to Hadoop Yarn. 
> And when Flink client finished , Oozie will get its exitCode to determine job 
> submission status and then do some extra things.
> So how Oozie catch {{System.exit()}}? It will implement JDK SecurityManager. 
> ([Oozie related code 
> link|https://github.com/apache/oozie/blob/f1e01a9e155692aa5632f4573ab1b3ebeab7ef45/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/security/LauncherSecurityManager.java#L24]).
>  
> Now when Flink Client finished successfully, it will call 
> {{System.exit(0)}}([Flink related code 
> link|https://github.com/apache/flink/blob/195298aea327b3f98d9852121f0f146368696300/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1133])
>  method. 
> And then JVM will use LauncherSecurityManager(Oozie implemented) to handle 
> {{System.exit(0)}} method and trigger {{LauncherSecurityManager.checkExit()}} 
> method, and then will throw exception. 
> Finally Flink Client will catch its {{throwable}} and call 
> {{System.exit(31)}}([related code 
> link|https://github.com/apache/flink/blob/195298aea327b3f98d9852121f0f146368696300/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L1139])
>  method again. It will cause Oozie to misjudge the status of the Fllink job.
> Actually it's a corner case. In most scenes, the situation I mentioned will 
> not happen. But it's still necessary for us to optimize client exit logic. 
> Besides, i think the problem above may also exist in some other frameworks 
> such as linkedin/azakaban and apache/airflow, which are using Flink client to 
> submit batch job.
> Flink related code:
> {code:java}
>     public static void main(final String[] args) {
>         EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", 
> args);
>         // 1. find the configuration directory
>         final String configurationDirectory = 
> getConfigurationDirectoryFromEnv();
>         // 2. load the global configuration
>         final Configuration configuration =
>                 GlobalConfiguration.loadConfiguration(configurationDirectory);
>         // 3. load the custom command lines
>         final List<CustomCommandLine> customCommandLines =
>                 loadCustomCommandLines(configuration, configurationDirectory);
>         try {
>             final CliFrontend cli = new CliFrontend(configuration, 
> customCommandLines);
>             SecurityUtils.install(new 
> SecurityConfiguration(cli.configuration));
>             int retCode =
>                     SecurityUtils.getInstalledContext().runSecured(() -> 
> cli.parseAndRun(args));
>             System.exit(retCode);
>         } catch (Throwable t) {
>             final Throwable strippedThrowable =
>                     ExceptionUtils.stripException(t, 
> UndeclaredThrowableException.class);
>             LOG.error("Fatal error while running command line interface.", 
> strippedThrowable);
>             strippedThrowable.printStackTrace();
>             System.exit(31);
>         }
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to