This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 22f2d6e79e0bba5f2101f1d1c42922cd36c001b4 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Tue Jul 10 15:57:37 2018 +0800 KYLIN-3382 code refine --- .../apache/kylin/job/common/PatternedLogger.java | 100 +++++++-------------- .../apache/kylin/engine/spark/SparkExecutable.java | 9 +- 2 files changed, 38 insertions(+), 71 deletions(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java index 0f1bd2d..99a1aa9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java @@ -23,6 +23,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; @@ -33,7 +34,7 @@ import com.google.common.collect.Maps; */ public class PatternedLogger extends BufferedLogger { private final Map<String, String> info = Maps.newHashMap(); - ILogListener listener; + ILogListener listener = null; private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager"); private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)"); @@ -53,6 +54,22 @@ public class PatternedLogger extends BufferedLogger { private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)"); + private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index. + + static { + patternMap.put(PATTERN_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1)); + patternMap.put(PATTERN_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1)); + patternMap.put(PATTERN_JOB_ID, new Pair(ExecutableConstants.MR_JOB_ID, 1)); + patternMap.put(PATTERN_HDFS_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 1)); + patternMap.put(PATTERN_SOURCE_RECORDS_COUNT, new Pair(ExecutableConstants.SOURCE_RECORDS_COUNT, 1)); + patternMap.put(PATTERN_SOURCE_RECORDS_SIZE, new Pair(ExecutableConstants.SOURCE_RECORDS_SIZE, 1)); + patternMap.put(PATTERN_HIVE_APP_ID_URL, new Pair(ExecutableConstants.YARN_APP_URL, 2)); + patternMap.put(PATTERN_HIVE_APP_ID_URL_2, new Pair(ExecutableConstants.YARN_APP_URL, 1)); + patternMap.put(PATTERN_HIVE_BYTES_WRITTEN, new Pair(ExecutableConstants.HDFS_BYTES_WRITTEN, 2)); + patternMap.put(PATTERN_SPARK_APP_ID, new Pair(ExecutableConstants.YARN_APP_ID, 1)); + patternMap.put(PATTERN_SPARK_APP_URL, new Pair(ExecutableConstants.YARN_APP_URL, 1)); + } + public PatternedLogger(Logger wrappedLogger) { super(wrappedLogger); } @@ -65,85 +82,30 @@ public class PatternedLogger extends BufferedLogger { @Override public void log(String message) { super.log(message); - Matcher matcher = PATTERN_APP_ID.matcher(message); - if (matcher.find()) { - String appId = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_ID, appId); - } - - matcher = PATTERN_APP_URL.matcher(message); - if (matcher.find()) { - String appTrackingUrl = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl); - } - - matcher = PATTERN_JOB_ID.matcher(message); - if (matcher.find()) { - String mrJobID = matcher.group(1); - info.put(ExecutableConstants.MR_JOB_ID, mrJobID); - } - - matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message); - if (matcher.find()) { - String hdfsWritten = matcher.group(1); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); - } - - matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message); - if (matcher.find()) { - String sourceCount = matcher.group(1); - info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount); - } - - matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message); - if (matcher.find()) { - String sourceSize = matcher.group(1); - info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize); - } - - // hive - matcher = PATTERN_HIVE_APP_ID_URL.matcher(message); - if (matcher.find()) { - String jobId = matcher.group(1); - String trackingUrl = matcher.group(2); - info.put(ExecutableConstants.MR_JOB_ID, jobId); - info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); - } else { - matcher = PATTERN_HIVE_APP_ID_URL_2.matcher(message); + Matcher matcher; + for (Pattern pattern : patternMap.keySet()) { + matcher = pattern.matcher(message); if (matcher.find()) { - String jobId = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_ID, jobId); + String key = patternMap.get(pattern).getFirst(); + int index = patternMap.get(pattern).getSecond(); + String value = matcher.group(index); + info.put(key, value); + if (listener != null) { + listener.onLogEvent(key, info); + } + break; } } - matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message); - if (matcher.find()) { - // String hdfsRead = matcher.group(1); - String hdfsWritten = matcher.group(2); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); - } - - // spark - matcher = PATTERN_SPARK_APP_ID.matcher(message); - if (matcher.find()) { - String app_id = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_ID, app_id); - } - - matcher = PATTERN_SPARK_APP_URL.matcher(message); - if (matcher.find()) { - String trackingUrl = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); - listener.onLogEvent(info); - } } public Map<String, String> getInfo() { return info; } + // Listener interface on notify pattern matched event public interface ILogListener{ - void onLogEvent(Map<String, String> info); + void onLogEvent(String infoKey, Map<String, String> info); } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 6a1c2c6..1c64119 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -157,12 +157,17 @@ public class SparkExecutable extends AbstractExecutable { CliCommandExecutor exec = new CliCommandExecutor(); PatternedLogger patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() { @Override - public void onLogEvent(Map<String, String> info) { - getManager().addJobInfo(getId(), info); + public void onLogEvent(String infoKey, Map<String, String> info) { + // only care two properties here + if (ExecutableConstants.YARN_APP_ID.equals(infoKey) + || ExecutableConstants.YARN_APP_ID.equals(infoKey)) { + getManager().addJobInfo(getId(), info); + } } }); exec.execute(cmd, patternedLogger); + // update all properties Map<String, String> joblogInfo = patternedLogger.getInfo(); readCounters(joblogInfo); getManager().addJobInfo(getId(), joblogInfo);