#3314 fix bugs data size in first two steps
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6e45a76 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6e45a76 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6e45a76 Branch: refs/heads/master Commit: c6e45a76dac1f9428308622558fe3608e11b66df Parents: 5aa8033 Author: Sheng Zhang <sheng.zh...@kyligence.io> Authored: Tue Dec 5 19:03:34 2017 +0800 Committer: Sheng Zhang <sheng.zh...@kyligence.io> Committed: Wed Dec 27 16:59:51 2017 +0800 ---------------------------------------------------------------------- .../source/hive/CreateFlatHiveTableStep.java | 30 +++++++++++++++++++- .../apache/kylin/source/hive/HiveMRInput.java | 8 ++++++ 2 files changed, 37 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c6e45a76/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index 48e7686..891b090 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -18,7 +18,14 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.Matcher; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.common.util.Pair; @@ -26,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -39,6 +47,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class); protected final PatternedLogger stepLogger = new PatternedLogger(logger); + private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';"); protected void createFlatHiveTable(KylinConfig config) throws IOException { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); @@ -51,12 +60,31 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { stepLogger.log(cmd); Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); - getManager().addJobInfo(getId(), stepLogger.getInfo()); + Map<String, String> info = stepLogger.getInfo(); + + //get the flat Hive table size + Matcher matcher = HDFS_LOCATION.matcher(cmd); + if (matcher.find()) { + String hiveFlatTableHdfsUrl = matcher.group(1); + long size = getFileSize(hiveFlatTableHdfsUrl); + info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size); + logger.info("HDFS_Bytes_Writen: " + size); + } + getManager().addJobInfo(getId(), info); if (response.getFirst() != 0) { throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); } } + private long getFileSize(String hdfsUrl) throws IOException { + Configuration configuration = new Configuration(); + Path path = new Path(hdfsUrl); + FileSystem fs = path.getFileSystem(configuration); + ContentSummary contentSummary = fs.getContentSummary(path); + long length = contentSummary.getLength(); + return length; + } + private KylinConfig getCubeSpecificConfig() { String cubeName = CubingExecutableUtil.getCubeName(getParams()); CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); http://git-wip-us.apache.org/repos/asf/kylin/blob/c6e45a76/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 3671266..0b23121 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -281,6 +281,12 @@ public class HiveMRInput implements IMRInput { return hiveClient.getHiveTableRows(database, table); } + private long getDataSize(String database, String table) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + long size = hiveClient.getHiveTableMeta(database, table).fileSize; + return size; + } + private void redistributeTable(KylinConfig config, int numReducers) throws IOException { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride()); @@ -346,6 +352,8 @@ public class HiveMRInput implements IMRInput { stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); redistributeTable(config, numReducers); + long dataSize = getDataSize(database, tableName); + getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize); return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); } catch (Exception e) {