github-actions[bot] commented on issue #16268:
URL: 
https://github.com/apache/dolphinscheduler/issues/16268#issuecomment-2205646787

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/dolphinscheduler/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### What happened
   
   DolphinScheduler Version 3.2.1
   When stop a Process with Flink Task in CLUSTER Mode, dolphinscheduler will 
kill the flink job yarn application first. 
   YarnApplicationManager.execYarnKillCommand will be invoke, and the Yarn Kill 
Command Will failed with error cannot find command yarn 
   ```
   [ERROR] 2024-07-03 10:16:53.690 +0800 - Kill yarn application 
[[application_1714114694986_0041]] failed
   org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: 
/tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6:
 yarn:未找到命令
   
        at 
org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
        at 
org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
        at 
org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
        at 
org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
        at 
org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
        at 
org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
        at 
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
        at 
org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
        at 
org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
        at 
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
        at 
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
        at 
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
        at 
org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
        at 
org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   [ERROR] 2024-07-03 10:16:53.691 +0800 - Cancel application failed: 
/tmp/dolphinscheduler/exec/process/default/13289909089664/13776450314784_11/190/200/application_1714114694986_0041.kill:行6:
 yarn:未找到命令
   ```
   
   The root cause is that the shell file is executed by sh not bash  
   
https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java#L69-L90
  
   sh do not load /etc/profile automatically for the PATH, so sh cannot find 
yarn command  
   Need add "source /etc/profile" to load the PATH and execute yarn command  
   Change code like following  
   ``` java
       private void execYarnKillCommand(String tenantCode, String commandFile,
                                        String cmd) throws Exception {
           StringBuilder sb = new StringBuilder();
           sb.append("#!/bin/sh\n");
           sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
           sb.append("cd $BASEDIR\n");
           sb.append("source /etc/profile\n");
           sb.append("\n\n");
           sb.append(cmd);
   
           File f = new File(commandFile);
   
           if (!f.exists()) {
               org.apache.commons.io.FileUtils.writeStringToFile(new 
File(commandFile), sb.toString(),
                       StandardCharsets.UTF_8);
           }
   
           String runCmd = String.format("%s %s", Constants.SH, commandFile);
           runCmd = 
org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
           log.info("kill cmd:{}", runCmd);
           org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
       }
   
   ```
   
   After make this change, the YarnApplicationManager.execYarnKillCommand can 
kill the yarn process sucessfully when stop the Flink Task  
   However, there are still error in logs  
   ```
   [ERROR] 2024-07-03 15:10:04.875 +0800 - Kill yarn application 
[[application_1714114694986_0057]] failed
   org.apache.dolphinscheduler.common.shell.AbstractShell$ExitCodeException: 
2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: 
Connecting to ResourceManager at node1/172.0.107.57:8032
   2024-07-03 15:10:04,863 INFO impl.YarnClientImpl: Killed application 
application_1714114694986_0057
   
        at 
org.apache.dolphinscheduler.common.shell.AbstractShell.runCommand(AbstractShell.java:205)
        at 
org.apache.dolphinscheduler.common.shell.AbstractShell.run(AbstractShell.java:118)
        at 
org.apache.dolphinscheduler.common.shell.ShellExecutor.execute(ShellExecutor.java:125)
        at 
org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:103)
        at 
org.apache.dolphinscheduler.common.shell.ShellExecutor.execCommand(ShellExecutor.java:86)
        at 
org.apache.dolphinscheduler.common.utils.OSUtils.exeShell(OSUtils.java:345)
        at 
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(OSUtils.java:334)
        at 
org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.execYarnKillCommand(YarnApplicationManager.java:89)
        at 
org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager.killApplication(YarnApplicationManager.java:48)
        at 
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(ProcessUtils.java:192)
        at 
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.doKill(TaskInstanceKillOperationFunction.java:100)
        at 
org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction.operate(TaskInstanceKillOperationFunction.java:69)
        at 
org.apache.dolphinscheduler.server.worker.rpc.TaskInstanceOperatorImpl.killTask(TaskInstanceOperatorImpl.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.dolphinscheduler.extract.base.server.ServerMethodInvokerImpl.invoke(ServerMethodInvokerImpl.java:41)
        at 
org.apache.dolphinscheduler.extract.base.server.JdkDynamicServerHandler.lambda$processReceived$0(JdkDynamicServerHandler.java:108)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   [ERROR] 2024-07-03 15:10:04.876 +0800 - Cancel application failed: 
2024-07-03 15:10:04,274 INFO client.DefaultNoHARMFailoverProxyProvider: 
Connecting to ResourceManager at node1/172.0.107.57:8032
   ```
   
   I tried start another Flink Task, create the kill command locally and  run 
the command. 
   The command success with output Stream
   ```
   2024-07-03 15:37:18,381 INFO client.DefaultNoHARMFailoverProxyProvider: 
Connecting to ResourceManager at node1/172.0.107.57:8032
   Killing application application_1714114694986_0059
   2024-07-03 15:37:18,883 INFO impl.YarnClientImpl: Killed application 
application_1714114694986_0059
   ```
   
   I am not sure why the AbstractShell do not treat this like a successful 
execution and put the INFO into error stream
   
   ### What you expected to happen
   
   1.YarnApplicationManager.execYarnKillCommand kill Yarn Applicaiton 
Successful without any error  
   2.AbstractYarnTask keep tracking the Yarn Applicaiton status, if the Yarn 
Application is still running, the task is in executing state.  
   
   ### How to reproduce
   
   Currently, Flink Task has not implement tracking Yarn Application Status.  
   If you run the tasks in CLUSTER Mode, after submit the job to Yarn, the task 
will success and finished.  
   If you want to acutally stop the Flink Job, you need go to the Yarn 
Application UI to stop the flink Job
   However I want to track the yarn application status and end the task from 
dolphinscheduler, because we do not want to expose our Yarn Application WebUI 
to our users.  
   I add following code in FlinkTask to monitor the Yarn Application Status 
   ``` java
       @Override
       public void handle(TaskCallBack taskCallBack) throws TaskException {
           super.handle(taskCallBack);
           if (FlinkDeployMode.CLUSTER.equals(flinkParameters.getDeployMode()) 
||
                   
FlinkDeployMode.APPLICATION.equals(flinkParameters.getDeployMode())) {
               trackApplicationStatus();
           }
       }
   
       @Override
       public void trackApplicationStatus() throws TaskException {
           log.info("Flink Task Yarn Application Id is " + appIds);
           YarnClient yarnClient = YarnClient.createYarnClient();
           try {
               initialYarnClient(yarnClient);
               String[] splitAppIds = appIds.split("_");
               ApplicationId applicationId = 
ApplicationId.newInstance(Long.parseLong(splitAppIds[1]),
                       Integer.parseInt(splitAppIds[2]));
               boolean yarnRunningFlag = true;
               while (yarnRunningFlag) {
                   ApplicationReport appReport = 
yarnClient.getApplicationReport(applicationId);
                   YarnApplicationState appState = 
appReport.getYarnApplicationState();
                   log.info("Yarn Application State is " + appState);
                   if (YarnApplicationState.FAILED.equals(appState)) {
                       yarnRunningFlag = false;
                       setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
                   } else if (YarnApplicationState.FINISHED.equals(appState) ||
                           YarnApplicationState.KILLED.equals(appState)) {
                       yarnRunningFlag = false;
                   }
                   
Thread.sleep(FlinkConstants.FLINK_YARN_TRACKING_SLEEP_MILLIS);
               }
           } catch (YarnException | IOException | NullPointerException e) {
               log.error("Failed to track application status", e);
               throw new RuntimeException("Failed to track application status");
           } catch (InterruptedException ex) {
               Thread.currentThread().interrupt();
               log.info("The current yarn task has been interrupted", ex);
               setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
               throw new TaskException("The current yarn task has been 
interrupted", ex);
           } finally {
               try {
                   // Stop YarnClient
                   yarnClient.stop();
                   // Close YarnClient
                   yarnClient.close();
               } catch (IOException e) {
                   log.error("Close Yarn Client Failed!", e);
               }
           }
   
       }
   
       private void initialYarnClient(YarnClient yarnClient) throws 
MalformedURLException {
           YarnConfiguration conf = new YarnConfiguration();
           conf.addResource(new 
File(System.getenv("HADOOP_CONF_DIR").concat("/hdfs-site.xml")).toURI().toURL());
           conf.addResource(new 
File(System.getenv("HADOOP_CONF_DIR").concat("/core-site.xml")).toURI().toURL());
           conf.addResource(new 
File(System.getenv("HADOOP_CONF_DIR").concat("/yarn-site.xml")).toURI().toURL());
           yarnClient.init(conf);
           yarnClient.start();
       }
   ```
   After add this code, the Process with Flink Task will keep in EXECUTE state, 
and when you can stop the process, dolphinsheduler will try to kill the flink 
yarn application by command during stop the task.
   
   ### Anything else
   
   _No response_
   
   ### Version
   
   3.2.x
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to