This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 1a7a5a2 KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile 1a7a5a2 is described below commit 1a7a5a2644b40b9a5cdd74a2c4b6c7e0ab78193e Author: harveyyue <yw_yue...@126.com> AuthorDate: Tue Mar 10 15:02:55 2020 +0800 KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile --- .../org/apache/kylin/engine/flink/FlinkExecutable.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java index 5344cb5..5fb7190 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java @@ -48,7 +48,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; @@ -163,9 +165,6 @@ public class FlinkExecutable extends AbstractExecutable { String hadoopClasspathEnv = new File(hadoopConf).getParentFile().getAbsolutePath(); String jobJar = config.getKylinJobJarPath(); - if (StringUtils.isEmpty(jars)) { - jars = jobJar; - } String segmentID = this.getParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt()); CubeSegment segment = cube.getSegmentById(segmentID); @@ -205,16 +204,24 @@ public class FlinkExecutable extends AbstractExecutable { //flink on yarn specific option (pattern : -yn 1) if (configOptionKey.startsWith("-y") && !entry.getValue().isEmpty()) { sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue()); - } else if(!configOptionKey.startsWith("-y")){ + } else if (!configOptionKey.startsWith("-y")) { //flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624) sb.append(" ").append(configOptionKey).append("=").append(entry.getValue()); } } } + if (StringUtils.isNotBlank(jars)) { + String[] splitJars = jars.split(",\\s*"); + Set<String> setJars = new HashSet(); + setJars.addAll(Arrays.asList(splitJars)); + for (String jar : setJars) { + sb.append(String.format(Locale.ROOT, " -C file://%s", jar)); + } + } sb.append(" -c org.apache.kylin.common.util.FlinkEntry -p %s %s %s "); final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv, - KylinConfig.getFlinkHome(), parallelism, jars, formatArgs()); + KylinConfig.getFlinkHome(), parallelism, jobJar, formatArgs()); logger.info("cmd: " + cmd); final ExecutorService executorService = Executors.newSingleThreadExecutor(); final CliCommandExecutor exec = new CliCommandExecutor();