This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new a565fda KYLIN-3427 Bug fix for covnerting to HFile in Spark a565fda is described below commit a565fdae47a818ff0062de4ac6cbdde733cd6bf5 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Tue Jul 10 17:29:20 2018 +0800 KYLIN-3427 Bug fix for covnerting to HFile in Spark --- .../src/main/java/org/apache/kylin/cube/CubeManager.java | 2 +- .../java/org/apache/kylin/job/common/PatternedLogger.java | 7 +++---- .../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++---- .../apache/kylin/storage/hbase/steps/SparkCubeHFile.java | 4 +++- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 3f4c576..3ff0160 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -801,7 +801,7 @@ public class CubeManager implements IRealizationProvider { if (force == false) { List<String> emptySegment = Lists.newArrayList(); for (CubeSegment seg : mergingSegments) { - if (seg.getSizeKB() == 0) { + if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) { emptySegment.add(seg.getName()); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java index 99a1aa9..73e7c56 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java @@ -34,18 +34,18 @@ import com.google.common.collect.Maps; */ public class PatternedLogger extends BufferedLogger { private final Map<String, String> info = Maps.newHashMap(); - ILogListener listener = null; + private ILogListener listener = null; private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager"); private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)"); private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)"); private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)"); private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)"); - private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write"); + private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write"); // hive private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)"); - private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS"); + private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write: (\\d+) SUCCESS"); private static final Pattern PATTERN_HIVE_APP_ID_URL_2 = Pattern.compile("Executing on YARN cluster with App id (.*?)"); @@ -53,7 +53,6 @@ public class PatternedLogger extends BufferedLogger { private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)"); private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)"); - private static Map<Pattern, Pair<String, Integer>> patternMap = Maps.newHashMap(); // key is pattern, value is a pair, the first is property key, second is pattern index. static { diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 1c64119..90442a4 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -55,7 +55,7 @@ public class SparkExecutable extends AbstractExecutable { private static final String CLASS_NAME = "className"; private static final String JARS = "jars"; private static final String JOB_ID = "jobId"; - private String counter_save_as; + private static final String COUNTER_SAVE_AS = "CounterSaveAs"; public void setClassName(String className) { this.setParam(CLASS_NAME, className); @@ -70,7 +70,11 @@ public class SparkExecutable extends AbstractExecutable { } public void setCounterSaveAs(String value) { - counter_save_as = value; + this.setParam(COUNTER_SAVE_AS, value); + } + + public String getCounterSaveAs() { + return getParam(COUNTER_SAVE_AS); } private String formatArgs() { @@ -80,7 +84,7 @@ public class SparkExecutable extends AbstractExecutable { tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" "); if (entry.getKey().equals(CLASS_NAME)) { stringBuilder.insert(0, tmp); - } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID)) { + } else if (entry.getKey().equals(JARS) || entry.getKey().equals(JOB_ID) || entry.getKey().equals(COUNTER_SAVE_AS)) { // JARS is for spark-submit, not for app continue; } else { @@ -160,7 +164,7 @@ public class SparkExecutable extends AbstractExecutable { public void onLogEvent(String infoKey, Map<String, String> info) { // only care two properties here if (ExecutableConstants.YARN_APP_ID.equals(infoKey) - || ExecutableConstants.YARN_APP_ID.equals(infoKey)) { + || ExecutableConstants.YARN_APP_URL.equals(infoKey)) { getManager().addJobInfo(getId(), info); } } @@ -219,6 +223,7 @@ public class SparkExecutable extends AbstractExecutable { } private void readCounters(final Map<String, String> info) { + String counter_save_as = getCounterSaveAs(); if (counter_save_as != null) { String[] saveAsNames = counter_save_as.split(","); saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), saveAsNames, 0, info); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java index b2571ae..a23156c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java @@ -239,8 +239,10 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration()); + // output the data size to console, job engine will parse and save the metric + // please note: this mechanism won't work when spark.submit.deployMode=cluster System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten()); - // deleteHDFSMeta(metaUrl); + deleteHDFSMeta(metaUrl); } private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {