[ 
https://issues.apache.org/jira/browse/FLINK-16306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046660#comment-17046660
 ] 

Kostas Kloudas commented on FLINK-16306:
----------------------------------------

I see. I agree that add a sanity check that the session is up and running has a 
value on its own. Is anyone going to work on this? For the other issue I will 
ask there for the proposed solution.

> Validate YARN session state before job submission
> -------------------------------------------------
>
>                 Key: FLINK-16306
>                 URL: https://issues.apache.org/jira/browse/FLINK-16306
>             Project: Flink
>          Issue Type: Task
>          Components: Client / Job Submission
>    Affects Versions: 1.10.0
>            Reporter: Daniel Laszlo Magyar
>            Priority: Major
>
> To better handle not properly stopped yarn sessions, state of the session 
> should be validated before job submission.
>  Currently if {{execution.target: yarn-session}} is set in 
> {{conf/flink-conf.yaml}} and the hidden YARN property file 
> {{/tmp/.yarn-properties-root}} is present, FlinkSessionCli tries to submit 
> the job regardless of the session’s state. 
> Apparently, the property file cannot get cleaned up automatically when the 
> session is killed e.g. via {{yarn app -kill <appID>}} and this behaviour is 
> pointed out in the logs upon running via yarn-session.sh, but the contained 
> application state could be checked before submitting to it. The current 
> behaviour feels inconsistent with the scenario when the YARN property file 
> actually does get cleaned up e.g. by manually deleting the file, in which 
> case a per-job cluster is spun up before submitting to it.
>  
> Replication steps:
>  • start flink yarn session via {{./bin/yarn-session.sh -d}}, this writes the 
> application id to {{/tmp/.yarn-properties-root}}
>  • set {{execution.target: yarn-session}} in 
> {{/etc/flink/conf/flink-conf.yaml}}
>  • kill session via {{yarn app -kill <appID>}}
>  • try to submit job, e.g.: {{flink run -d -p 2 
> examples/streaming/WordCount.jar}}
> The logs clearly state that the FlinkYarnSessionCli tries to submit the job 
> to the killed application:
> {code:java}
> 20/02/26 13:34:26 ERROR yarn.YarnClusterDescriptor: The application 
> application_1582646904843_0021 doesn't run anymore. It has previously 
> completed with final status: KILLED
> ...
> 20/02/26 13:34:26 ERROR cli.CliFrontend: Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Couldn't retrieve Yarn cluster
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>       at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>       at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:709)
>       at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:258)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:940)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1014)
>       at java.base/java.security.AccessController.doPrivileged(Native Method)
>       at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1014)
> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: 
> Couldn't retrieve Yarn cluster
>       at 
> org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:365)
>       at 
> org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:122)
>       at 
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:63)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1750)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1637)
>       at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:96)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>       ... 11 more
> Caused by: java.lang.RuntimeException: The Yarn application 
> application_1582646904843_0021 doesn't run anymore.
>       at 
> org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:352)
>       ... 23 more
> {code}
> If at this point the property file gets deleted e.g. by simply running {{rm 
> -f /tmp/.yarn-properties-root}} and the job gets resubmitted, a per-job 
> cluster gets spun up. This behaviour could be achieved without deleting the 
> outdated property file.
> CC: [~gyfora]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to