Repository: kylin Updated Branches: refs/heads/master 9f1029a76 -> d2e96bda0
KYLIN-1752 Add an option to fail cube build job when source table is empty Signed-off-by: shaofengshi <shaofeng...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d2e96bda Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d2e96bda Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d2e96bda Branch: refs/heads/master Commit: d2e96bda0a5848815cf4430746276bc9e896bad7 Parents: 9f1029a Author: gaodayue <gaoda...@meituan.com> Authored: Fri Jun 3 18:17:43 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Jun 7 15:09:29 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 ++++ .../source/hive/CreateFlatHiveTableStep.java | 22 +++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d2e96bda/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 68e3b6c..7664c66 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -363,6 +363,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.job.cmd.extra.args"); } + public boolean isEmptySegmentAllowed() { + return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true")); + } + public String getOverrideHiveTableLocation(String table) { return getOptional("hive.table.location." + table.toUpperCase()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d2e96bda/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index e9b9994..443de99 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -23,9 +23,11 @@ import java.io.InputStream; public class CreateFlatHiveTableStep extends AbstractExecutable { private final BufferedLogger stepLogger = new BufferedLogger(logger); - private long readRowCountFromFile(Path file) throws IOException { - FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); - InputStream in = fs.open(file); + private long readRowCountFromFile() throws IOException { + Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); + + FileSystem fs = FileSystem.get(rowCountFile.toUri(), HadoopUtil.getCurrentConfiguration()); + InputStream in = fs.open(rowCountFile); try { String content = IOUtils.toString(in); return Long.valueOf(content.trim()); // strip the '\n' character @@ -35,9 +37,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { } } - private int determineNumReducer(KylinConfig config) throws IOException { - Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); - long rowCount = readRowCountFromFile(rowCountFile); + private int determineNumReducer(KylinConfig config, long rowCount) throws IOException { int mapperInputRows = config.getHadoopJobMapperInputRows(); int numReducers = Math.round(rowCount / ((float) mapperInputRows)); @@ -78,8 +78,14 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = getCubeSpecificConfig(); try { - - int numReducers = determineNumReducer(config); + long rowCount = readRowCountFromFile(); + if (!config.isEmptySegmentAllowed() && rowCount == 0) { + stepLogger.log("Detect upstream hive table is empty, " + + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + + int numReducers = determineNumReducer(config, rowCount); createFlatHiveTable(config, numReducers); return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());