This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch 3.1.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.8-prepare by this push:
new 87481aa7e0 [bug][plugin]Fix: Correct the way to determine the yarn
queue in Flink CommandLine and SQL mode (#14237)
87481aa7e0 is described below
commit 87481aa7e01f49c91ab2ffb2dabc71a5a8f823b3
Author: ORuteMa <[email protected]>
AuthorDate: Mon Jun 12 11:04:39 2023 +0800
[bug][plugin]Fix: Correct the way to determine the yarn queue in Flink
CommandLine and SQL mode (#14237)
* Fix: Correct the way to determine the yarn queue in Flink CommandLine
* fix the yarn queue in sql mode && refine the code
* refine code
* remove unnecessary comment
* fix yarn queue properties
* remove redundant variable
---
.../plugin/task/flink/FlinkArgsUtils.java | 50 ++++++++++++++++------
.../plugin/task/flink/FlinkConstants.java | 3 +-
2 files changed, 39 insertions(+), 14 deletions(-)
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
index 69f7465a75..782b2f2f97 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
@@ -156,12 +156,9 @@ public class FlinkArgsUtils {
}
// yarn.application.queue
- String others = flinkParameters.getOthers();
- if (StringUtils.isEmpty(others) ||
!others.contains(FlinkConstants.FLINK_QUEUE)) {
- String queue = flinkParameters.getQueue();
- if (StringUtils.isNotEmpty(queue)) {
-
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
queue));
- }
+ String queue = flinkParameters.getQueue();
+ if (StringUtils.isNotEmpty(queue)) {
+
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE,
queue));
}
}
@@ -241,13 +238,6 @@ public class FlinkArgsUtils {
args.add(taskManagerMemory);
}
- if (StringUtils.isEmpty(others) ||
!others.contains(FlinkConstants.FLINK_QUEUE)) {
- String queue = flinkParameters.getQueue();
- if (StringUtils.isNotEmpty(queue)) { // -yqu
- args.add(FlinkConstants.FLINK_QUEUE);
- args.add(queue);
- }
- }
break;
case LOCAL:
break;
@@ -290,6 +280,40 @@ public class FlinkArgsUtils {
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs,
ParamUtils.convert(paramsMap)));
}
+ // determine yarn queue
+ determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
return args;
}
+
+ private static void determinedYarnQueue(List<String> args, FlinkParameters
flinkParameters,
+ FlinkDeployMode deployMode, String
flinkVersion) {
+ switch (deployMode) {
+ case CLUSTER:
+ if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
+ ||
FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
+ doAddQueue(args, flinkParameters,
FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+ } else {
+ doAddQueue(args, flinkParameters,
FlinkConstants.FLINK_QUEUE_FOR_MODE);
+ }
+ case APPLICATION:
+ doAddQueue(args, flinkParameters,
FlinkConstants.FLINK_QUEUE_FOR_TARGETS);
+ }
+ }
+
+ private static void doAddQueue(List<String> args, FlinkParameters
flinkParameters, String option) {
+ String others = flinkParameters.getOthers();
+ if (StringUtils.isEmpty(others) || !others.contains(option)) {
+ String queue = flinkParameters.getQueue();
+ if (StringUtils.isNotEmpty(queue)) {
+ switch (option) {
+ case FlinkConstants.FLINK_QUEUE_FOR_TARGETS:
+
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS + "=%s", queue));
+ case FlinkConstants.FLINK_QUEUE_FOR_MODE:
+ args.add(FlinkConstants.FLINK_QUEUE_FOR_MODE);
+ args.add(queue);
+ }
+ }
+ }
+ }
+
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index de513e0b0d..6e8d51b2ce 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -48,7 +48,8 @@ public class FlinkConstants {
public static final String FLINK_EXECUTION_TARGET = "-t";
public static final String FLINK_YARN_SLOT = "-ys";
public static final String FLINK_APP_NAME = "-ynm";
- public static final String FLINK_QUEUE = "-yqu";
+ public static final String FLINK_QUEUE_FOR_MODE = "-yqu";
+ public static final String FLINK_QUEUE_FOR_TARGETS =
"-Dyarn.application.queue";
public static final String FLINK_TASK_MANAGE = "-yn";
public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
public static final String FLINK_TASK_MANAGE_MEM = "-ytm";