KYLIN-2808, fix table not fould exception
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ff4f634f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ff4f634f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ff4f634f Branch: refs/heads/2622-2764 Commit: ff4f634fe5ad5f96b1506a2ab368eb701f62c5dc Parents: 1fff421 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Fri Aug 25 19:10:19 2017 +0800 Committer: æ <cheng.w...@kyligence.io> Committed: Sat Aug 26 00:19:08 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/hive/HiveMRInput.java | 41 +++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ff4f634f/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 71a7152..096134c 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -62,10 +62,12 @@ public class HiveMRInput implements IMRInput { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class); - + public static String getTableNameForHCat(TableDesc table) { String tableName = (table.isView()) ? table.getMaterializedName() : table.getName(); - return String.format("%s.%s", table.getDatabase(), tableName).toUpperCase(); + String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + : table.getDatabase(); + return String.format("%s.%s", database, tableName).toUpperCase(); } @Override @@ -139,17 +141,18 @@ public class HiveMRInput implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); + final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) + .getConfig(); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); // create flat table first addStepPhase1_DoCreateFlatTable(jobFlow); - + // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); } - + // special for hive addStepPhase1_DoMaterializeLookupTable(jobFlow); } @@ -158,14 +161,14 @@ public class HiveMRInput implements IMRInput { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow); - + jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); } protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow); - + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); if (task != null) { jobFlow.addTask(task); @@ -186,7 +189,8 @@ public class HiveMRInput implements IMRInput { return step; } - private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir) { + private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, + String jobWorkingDir) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -215,21 +219,25 @@ public class HiveMRInput implements IMRInput { if (lookUpTableDesc.isView()) { StringBuilder createIntermediateTableHql = new StringBuilder(); createIntermediateTableHql.append("DROP TABLE IF EXISTS " + intermediate + ";\n"); - createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n"); + createIntermediateTableHql + .append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n"); createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n"); - createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n"); + createIntermediateTableHql + .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n"); hiveCmdBuilder.addStatement(createIntermediateTableHql.toString()); hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";"; } } - hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length() - 1); + hiveViewIntermediateTables = hiveViewIntermediateTables.substring(0, + hiveViewIntermediateTables.length() - 1); step.setCmd(hiveCmdBuilder.build()); return step; } - private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) { + private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, + String cubeName) { //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); @@ -318,10 +326,12 @@ public class HiveMRInput implements IMRInput { logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount); if (rowCount == 0) { if (!config.isEmptySegmentAllowed()) { - stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\""); + stepLogger.log("Detect upstream hive table is empty, " + + "fail the job because \"kylin.job.allow-empty-segment\" = \"false\""); return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); } else { - return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute"); + return new ExecuteResult(ExecuteResult.State.SUCCEED, + "Row count is 0, no need to redistribute"); } } @@ -398,7 +408,8 @@ public class HiveMRInput implements IMRInput { config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); output.append("Hive table " + hiveTable + " is dropped. \n"); rmdirOnHDFS(getExternalDataPath()); - output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); + output.append( + "Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); } return output.toString(); }