port KYLIN-2012 to new interface introduced in KYLIN-2125
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b502a174 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b502a174 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b502a174 Branch: refs/heads/yang21-cdh5.7 Commit: b502a17414468652b75f84994fb371a767784a47 Parents: 6fc1c86 Author: Hongbin Ma <mahong...@apache.org> Authored: Wed Oct 26 14:04:56 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Wed Oct 26 18:49:34 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 5 ++- .../kylin/source/hive/BeelineHiveClient.java | 6 +-- .../source/hive/HiveSourceTableLoader.java | 32 +++++++-------- .../apache/kylin/source/hive/SchemaChecker.java | 41 ++++++++------------ 4 files changed, 38 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b502a174/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 8c64f91..0734f4f 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -38,8 +38,9 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.hive.HiveClient; +import org.apache.kylin.source.hive.HiveClientFactory; import org.apache.kylin.source.hive.HiveCmdBuilder; +import org.apache.kylin.source.hive.IHiveClient; import org.apache.kylin.source.kafka.TimedJsonStreamParser; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; @@ -205,7 +206,7 @@ public class DeployUtil { String tableFileDir = temp.getParent(); temp.delete(); - HiveClient hiveClient = new HiveClient(); + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); // create hive tables hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW"); hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase()))); http://git-wip-us.apache.org/repos/asf/kylin/blob/b502a174/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java index 0fbc39b..b027f2e 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java @@ -205,9 +205,9 @@ public class BeelineHiveClient implements IHiveClient { public static void main(String[] args) throws SQLException { - //BeelineHiveClient loader = new BeelineHiveClient("-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://sandbox:10000'"); - BeelineHiveClient loader = new BeelineHiveClient(StringUtils.join(args, " ")); - HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "events"); + BeelineHiveClient loader = new BeelineHiveClient("-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://sandbox:10000'"); + //BeelineHiveClient loader = new BeelineHiveClient(StringUtils.join(args, " ")); + HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test_kylin_fact_part"); System.out.println(hiveTableMeta); loader.close(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b502a174/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 346d278..1335ec3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.UUID; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; @@ -34,8 +35,10 @@ import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; /** @@ -49,27 +52,25 @@ public class HiveSourceTableLoader { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class); - public static final String OUTPUT_SURFIX = "json"; - public static final String TABLE_FOLDER_NAME = "table"; - public static final String TABLE_EXD_FOLDER_NAME = "table_exd"; - public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { - Map<String, Set<String>> db2tables = Maps.newHashMap(); - for (String table : hiveTables) { - String[] parts = HadoopUtil.parseHiveTableName(table); - Set<String> set = db2tables.get(parts[0]); - if (set == null) { - set = Sets.newHashSet(); - db2tables.put(parts[0], set); - } - set.add(parts[1]); + SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); + for (String fullTableName : hiveTables) { + String[] parts = HadoopUtil.parseHiveTableName(fullTableName); + db2tables.put(parts[0], parts[1]); + } + + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config)); + for (Map.Entry<String, String> entry : db2tables.entries()) { + SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue()); + result.raiseExceptionWhenInvalid(); } // extract from hive Set<String> loadedTables = Sets.newHashSet(); for (String database : db2tables.keySet()) { - List<String> loaded = extractHiveTables(database, db2tables.get(database), config); + List<String> loaded = extractHiveTables(database, db2tables.get(database), hiveClient); loadedTables.addAll(loaded); } @@ -82,12 +83,11 @@ public class HiveSourceTableLoader { metaMgr.removeTableExd(hiveTable); } - private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException { + private static List<String> extractHiveTables(String database, Set<String> tables, IHiveClient hiveClient) throws IOException { List<String> loadedTables = Lists.newArrayList(); MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); for (String tableName : tables) { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); HiveTableMeta hiveTableMeta; try { hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/b502a174/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java index 0c5ccd0..9fcf99e 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java @@ -9,8 +9,6 @@ import java.util.Set; import javax.annotation.Nullable; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; @@ -28,7 +26,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class SchemaChecker { - private final HiveClient hiveClient; + private final IHiveClient hiveClient; private final MetadataManager metadataManager; private final CubeManager cubeManager; @@ -69,23 +67,16 @@ public class SchemaChecker { } } - SchemaChecker(HiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) { + SchemaChecker(IHiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) { this.hiveClient = checkNotNull(hiveClient, "hiveClient is null"); this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); } - private List<FieldSchema> fetchSchema(String dbName, String tblName) throws Exception { - List<FieldSchema> fields = Lists.newArrayList(); - fields.addAll(hiveClient.getHiveTableFields(dbName, tblName)); - - Table table = hiveClient.getHiveTable(dbName, tblName); - List<FieldSchema> partitionFields = table.getPartitionKeys(); - if (partitionFields != null) { - fields.addAll(partitionFields); - } - - return fields; + private List<HiveTableMeta.HiveTableColumnMeta> fetchSchema(String dbName, String tblName) throws Exception { + List<HiveTableMeta.HiveTableColumnMeta> columnMetas = Lists.newArrayList(); + columnMetas.addAll(hiveClient.getHiveTableMeta(dbName, tblName).allColumns); + return columnMetas; } private List<CubeInstance> findCubeByTable(final String fullTableName) { @@ -110,12 +101,12 @@ public class SchemaChecker { return ImmutableList.copyOf(relatedCubes); } - private boolean isColumnCompatible(ColumnDesc column, FieldSchema field) { - if (!column.getName().equalsIgnoreCase(field.getName())) { + private boolean isColumnCompatible(ColumnDesc column, HiveTableMeta.HiveTableColumnMeta field) { + if (!column.getName().equalsIgnoreCase(field.name)) { return false; } - String typeStr = field.getType(); + String typeStr = field.dataType; // kylin uses double internally for float, see HiveSourceTableLoader.java // TODO should this normalization to be in DataType class ? if ("float".equalsIgnoreCase(typeStr)) { @@ -141,7 +132,7 @@ public class SchemaChecker { * @param fieldsMap current hive schema of `table` * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise */ - private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, FieldSchema> fieldsMap) { + private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, HiveTableMeta.HiveTableColumnMeta> fieldsMap) { Set<ColumnDesc> usedColumns = Sets.newHashSet(); for (TblColRef col : cube.getAllColumns()) { usedColumns.add(col.getColumnDesc()); @@ -150,7 +141,7 @@ public class SchemaChecker { List<String> violateColumns = Lists.newArrayList(); for (ColumnDesc column : table.getColumns()) { if (usedColumns.contains(column)) { - FieldSchema field = fieldsMap.get(column.getName()); + HiveTableMeta.HiveTableColumnMeta field = fieldsMap.get(column.getName()); if (field == null || !isColumnCompatible(column, field)) { violateColumns.add(column.getName()); } @@ -166,7 +157,7 @@ public class SchemaChecker { * @param fields current table metadata in hive * @return true if only new columns are appended in hive, false otherwise */ - private boolean checkAllColumnsInTableDesc(TableDesc table, List<FieldSchema> fields) { + private boolean checkAllColumnsInTableDesc(TableDesc table, List<HiveTableMeta.HiveTableColumnMeta> fields) { if (table.getColumnCount() > fields.size()) { return false; } @@ -188,15 +179,15 @@ public class SchemaChecker { return CheckResult.validOnFirstLoad(fullTableName); } - List<FieldSchema> currentFields; - Map<String, FieldSchema> currentFieldsMap = Maps.newHashMap(); + List<HiveTableMeta.HiveTableColumnMeta> currentFields; + Map<String, HiveTableMeta.HiveTableColumnMeta> currentFieldsMap = Maps.newHashMap(); try { currentFields = fetchSchema(dbName, tblName); } catch (Exception e) { return CheckResult.invalidOnFetchSchema(fullTableName, e); } - for (FieldSchema field : currentFields) { - currentFieldsMap.put(field.getName().toUpperCase(), field); + for (HiveTableMeta.HiveTableColumnMeta field : currentFields) { + currentFieldsMap.put(field.name.toUpperCase(), field); } List<String> issues = Lists.newArrayList();