This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ee7f2f435c4b82ac6b3867d4d6505de0de43c24f Author: GinaZhai <na.z...@kyligence.io> AuthorDate: Mon Jul 9 13:27:15 2018 +0800 KYLIN-3382 Yarn job link wasn't displayed when job is running Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../org/apache/kylin/job/common/PatternedLogger.java | 11 +++++++++++ .../org/apache/kylin/engine/spark/SparkExecutable.java | 17 ++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java index 8be5d02..0f1bd2d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java @@ -33,6 +33,7 @@ import com.google.common.collect.Maps; */ public class PatternedLogger extends BufferedLogger { private final Map<String, String> info = Maps.newHashMap(); + ILogListener listener; private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager"); private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)"); @@ -56,6 +57,11 @@ public class PatternedLogger extends BufferedLogger { super(wrappedLogger); } + public PatternedLogger(Logger wrappedLogger, ILogListener listener) { + super(wrappedLogger); + this.listener = listener; + } + @Override public void log(String message) { super.log(message); @@ -128,6 +134,7 @@ public class PatternedLogger extends BufferedLogger { if (matcher.find()) { String trackingUrl = matcher.group(1); info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); + listener.onLogEvent(info); } } @@ -135,4 +142,8 @@ public class PatternedLogger extends BufferedLogger { return info; } + public interface ILogListener{ + void onLogEvent(Map<String, String> info); + } + } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index dfaa2e1..6a1c2c6 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.util.Shell; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceTool; @@ -135,8 +136,13 @@ public class SparkExecutable extends AbstractExecutable { } StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append( - "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); + if (Shell.osType == Shell.OSType.OS_TYPE_WIN) { + stringBuilder.append( + "set HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); + } else { + stringBuilder.append( + "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); + } Map<String, String> sparkConfs = config.getSparkConfigOverride(); for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { @@ -149,7 +155,12 @@ public class SparkExecutable extends AbstractExecutable { formatArgs()); logger.info("cmd: " + cmd); CliCommandExecutor exec = new CliCommandExecutor(); - PatternedLogger patternedLogger = new PatternedLogger(logger); + PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() { + @Override + public void onLogEvent(Map<String, String> info) { + getManager().addJobInfo(getId(), info); + } + }); exec.execute(cmd, patternedLogger); Map<String, String> joblogInfo = patternedLogger.getInfo();