This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new a565fda  KYLIN-3427 Bug fix for covnerting to HFile in Spark
a565fda is described below

commit a565fdae47a818ff0062de4ac6cbdde733cd6bf5
Author: shaofengshi <shaofeng...@apache.org>
AuthorDate: Tue Jul 10 17:29:20 2018 +0800

    KYLIN-3427 Bug fix for covnerting to HFile in Spark
---
 .../src/main/java/org/apache/kylin/cube/CubeManager.java    |  2 +-
 .../java/org/apache/kylin/job/common/PatternedLogger.java   |  7 +++----
 .../java/org/apache/kylin/engine/spark/SparkExecutable.java | 13 +++++++++----
 .../apache/kylin/storage/hbase/steps/SparkCubeHFile.java    |  4 +++-
 4 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3f4c576..3ff0160 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -801,7 +801,7 @@ public class CubeManager implements IRealizationProvider {
             if (force == false) {
                 List<String> emptySegment = Lists.newArrayList();
                 for (CubeSegment seg : mergingSegments) {
-                    if (seg.getSizeKB() == 0) {
+                    if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) {
                         emptySegment.add(seg.getName());
                     }
                 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java 
b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
index 99a1aa9..73e7c56 100644
--- a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
+++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java
@@ -34,18 +34,18 @@ import com.google.common.collect.Maps;
  */
 public class PatternedLogger extends BufferedLogger {
     private final Map<String, String> info = Maps.newHashMap();
-    ILogListener listener = null;
+    private ILogListener listener = null;
 
     private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted 
application (.*?) to ResourceManager");
     private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to 
track the job: (.*)");
     private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running 
job: (.*)");
     private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = 
Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
     private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = 
Pattern.compile("Map input records=(\\d+)");
-    private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = 
Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
+    private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = 
Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write");
 
     // hive
     private static final Pattern PATTERN_HIVE_APP_ID_URL = 
Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
-    private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = 
Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
+    private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = 
Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) (?:HD|MAPR)FS Write: (\\d+) 
SUCCESS");
 
     private static final Pattern PATTERN_HIVE_APP_ID_URL_2 = 
Pattern.compile("Executing on YARN cluster with App id  (.*?)");
 
@@ -53,7 +53,6 @@ public class PatternedLogger extends BufferedLogger {
     private static final Pattern PATTERN_SPARK_APP_ID = 
Pattern.compile("Submitted application (.*?)");
     private static final Pattern PATTERN_SPARK_APP_URL = 
Pattern.compile("tracking URL: (.*)");
 
-
     private static Map<Pattern, Pair<String, Integer>> patternMap = 
Maps.newHashMap(); // key is pattern, value is a pair, the first is property 
key, second is pattern index.
 
     static {
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1c64119..90442a4 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -55,7 +55,7 @@ public class SparkExecutable extends AbstractExecutable {
     private static final String CLASS_NAME = "className";
     private static final String JARS = "jars";
     private static final String JOB_ID = "jobId";
-    private String counter_save_as;
+    private static final String COUNTER_SAVE_AS = "CounterSaveAs";
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
@@ -70,7 +70,11 @@ public class SparkExecutable extends AbstractExecutable {
     }
 
     public void setCounterSaveAs(String value) {
-        counter_save_as = value;
+        this.setParam(COUNTER_SAVE_AS, value);
+    }
+
+    public String getCounterSaveAs() {
+        return getParam(COUNTER_SAVE_AS);
     }
 
     private String formatArgs() {
@@ -80,7 +84,7 @@ public class SparkExecutable extends AbstractExecutable {
             tmp.append("-").append(entry.getKey()).append(" 
").append(entry.getValue()).append(" ");
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
-            } else if (entry.getKey().equals(JARS) || 
entry.getKey().equals(JOB_ID)) {
+            } else if (entry.getKey().equals(JARS) || 
entry.getKey().equals(JOB_ID) || entry.getKey().equals(COUNTER_SAVE_AS)) {
                 // JARS is for spark-submit, not for app
                 continue;
             } else {
@@ -160,7 +164,7 @@ public class SparkExecutable extends AbstractExecutable {
                 public void onLogEvent(String infoKey, Map<String, String> 
info) {
                     // only care two properties here
                     if (ExecutableConstants.YARN_APP_ID.equals(infoKey)
-                            || 
ExecutableConstants.YARN_APP_ID.equals(infoKey)) {
+                            || 
ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
                         getManager().addJobInfo(getId(), info);
                     }
                 }
@@ -219,6 +223,7 @@ public class SparkExecutable extends AbstractExecutable {
     }
 
     private void readCounters(final Map<String, String> info) {
+        String counter_save_as = getCounterSaveAs();
         if (counter_save_as != null) {
             String[] saveAsNames = counter_save_as.split(",");
             saveCounterAs(info.get(ExecutableConstants.SOURCE_RECORDS_COUNT), 
saveAsNames, 0, info);
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index b2571ae..a23156c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -239,8 +239,10 @@ public class SparkCubeHFile extends AbstractApplication 
implements Serializable
         hfilerdd2.saveAsNewAPIHadoopFile(outputPath, 
ImmutableBytesWritable.class, KeyValue.class,
                 HFileOutputFormat2.class, job.getConfiguration());
 
+        // output the data size to console, job engine will parse and save the 
metric
+        // please note: this mechanism won't work when 
spark.submit.deployMode=cluster
         System.out.println("HDFS: Number of bytes written=" + 
jobListener.metrics.getBytesWritten());
-        // deleteHDFSMeta(metaUrl);
+        deleteHDFSMeta(metaUrl);
     }
 
     private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, 
JavaSparkContext sc) throws IOException {

Reply via email to