Repository: kylin
Updated Branches:
  refs/heads/master 9f1029a76 -> d2e96bda0


KYLIN-1752 Add an option to fail cube build job when source table is empty

Signed-off-by: shaofengshi <shaofeng...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d2e96bda
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d2e96bda
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d2e96bda

Branch: refs/heads/master
Commit: d2e96bda0a5848815cf4430746276bc9e896bad7
Parents: 9f1029a
Author: gaodayue <gaoda...@meituan.com>
Authored: Fri Jun 3 18:17:43 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue Jun 7 15:09:29 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 ++++
 .../source/hive/CreateFlatHiveTableStep.java    | 22 +++++++++++++-------
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d2e96bda/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 68e3b6c..7664c66 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -363,6 +363,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.job.cmd.extra.args");
     }
 
+    public boolean isEmptySegmentAllowed() {
+        return 
Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
+    }
+
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("hive.table.location." + table.toUpperCase());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d2e96bda/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index e9b9994..443de99 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -23,9 +23,11 @@ import java.io.InputStream;
 public class CreateFlatHiveTableStep extends AbstractExecutable {
     private final BufferedLogger stepLogger = new BufferedLogger(logger);
 
-    private long readRowCountFromFile(Path file) throws IOException {
-        FileSystem fs = FileSystem.get(file.toUri(), 
HadoopUtil.getCurrentConfiguration());
-        InputStream in = fs.open(file);
+    private long readRowCountFromFile() throws IOException {
+        Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
+
+        FileSystem fs = FileSystem.get(rowCountFile.toUri(), 
HadoopUtil.getCurrentConfiguration());
+        InputStream in = fs.open(rowCountFile);
         try {
             String content = IOUtils.toString(in);
             return Long.valueOf(content.trim()); // strip the '\n' character
@@ -35,9 +37,7 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
         }
     }
 
-    private int determineNumReducer(KylinConfig config) throws IOException {
-        Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
-        long rowCount = readRowCountFromFile(rowCountFile);
+    private int determineNumReducer(KylinConfig config, long rowCount) throws 
IOException {
         int mapperInputRows = config.getHadoopJobMapperInputRows();
 
         int numReducers = Math.round(rowCount / ((float) mapperInputRows));
@@ -78,8 +78,14 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         try {
-
-            int numReducers = determineNumReducer(config);
+            long rowCount = readRowCountFromFile();
+            if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+                stepLogger.log("Detect upstream hive table is empty, " +
+                        "fail the job because 
\"kylin.job.allow.empty.segment\" = \"false\"");
+                return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+            }
+
+            int numReducers = determineNumReducer(config, rowCount);
             createFlatHiveTable(config, numReducers);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
 

Reply via email to