http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java new file mode 100644 index 0000000..a87ddd8 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +import java.util.List; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class TableSchemaUpdateChecker { + private final MetadataManager metadataManager; + private final CubeManager cubeManager; + + static class CheckResult { + private final boolean valid; + private final String reason; + + private CheckResult(boolean valid, String reason) { + this.valid = valid; + this.reason = reason; + } + + void raiseExceptionWhenInvalid() { + if (!valid) { + throw new RuntimeException(reason); + } + } + + static CheckResult validOnFirstLoad(String tableName) { + return new CheckResult(true, format("Table '%s' hasn't been loaded before", tableName)); + } + + static CheckResult validOnCompatibleSchema(String tableName) { + return new CheckResult(true, format("Table '%s' is compatible with all existing cubes", tableName)); + } + + static CheckResult invalidOnFetchSchema(String tableName, Exception e) { + return new CheckResult(false, format("Failed to fetch metadata of '%s': %s", tableName, e.getMessage())); + } + + static CheckResult invalidOnIncompatibleSchema(String tableName, List<String> reasons) { + StringBuilder buf = new StringBuilder(); + for (String reason : reasons) { + buf.append("- ").append(reason).append("\n"); + } + + return new CheckResult(false, format("Found %d issue(s) with '%s':%n%s Please disable and purge related cube(s) first", reasons.size(), tableName, buf.toString())); + } + } + + TableSchemaUpdateChecker(MetadataManager metadataManager, CubeManager cubeManager) { + this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); + this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); + } + + private List<CubeInstance> findCubeByTable(final String fullTableName) { + Iterable<CubeInstance> relatedCubes = Iterables.filter(cubeManager.listAllCubes(), new Predicate<CubeInstance>() { + @Override + public boolean apply(@Nullable CubeInstance cube) { + if (cube == null || cube.allowBrokenDescriptor()) { + return false; + } + DataModelDesc model = cube.getModel(); + if (model == null) + return false; + return model.containsTable(fullTableName); + } + }); + + return ImmutableList.copyOf(relatedCubes); + } + + private boolean isColumnCompatible(ColumnDesc column, ColumnDesc newCol) { + if (!column.getName().equalsIgnoreCase(newCol.getName())) { + return false; + } + + if (column.getType().isIntegerFamily()) { + // OLAPTable.listSourceColumns converts some integer columns to bigint, + // therefore strict type comparison won't work. + // changing from one integer type to another should be fine. + return newCol.getType().isIntegerFamily(); + } else if (column.getType().isNumberFamily()) { + // Both are float/double should be fine. + return newCol.getType().isNumberFamily(); + } else { + // only compare base type name, changing precision or scale should be fine + return column.getTypeName().equals(newCol.getTypeName()); + } + } + + /** + * check whether all columns used in `cube` has compatible schema in current hive schema denoted by `fieldsMap`. + * @param cube cube to check, must use `table` in its model + * @param origTable kylin's table metadata + * @param fieldsMap current hive schema of `table` + * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise + */ + private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc origTable, TableDesc newTable) { + Set<ColumnDesc> usedColumns = Sets.newHashSet(); + for (TblColRef col : cube.getAllColumns()) { + usedColumns.add(col.getColumnDesc()); + } + + List<String> violateColumns = Lists.newArrayList(); + for (ColumnDesc column : origTable.getColumns()) { + if (!column.isComputedColumnn() && usedColumns.contains(column)) { + ColumnDesc newCol = newTable.findColumnByName(column.getName()); + if (newCol == null || !isColumnCompatible(column, newCol)) { + violateColumns.add(column.getName()); + } + } + } + return violateColumns; + } + + /** + * check whether all columns in `table` are still in `fields` and have the same index as before. + * + * @param table kylin's table metadata + * @param fields current table metadata in hive + * @return true if only new columns are appended in hive, false otherwise + */ + private boolean checkAllColumnsInTableDesc(TableDesc origTable, TableDesc newTable) { + if (origTable.getColumnCount() > newTable.getColumnCount()) { + return false; + } + + ColumnDesc[] columns = origTable.getColumns(); + for (int i = 0; i < columns.length; i++) { + if (!isColumnCompatible(columns[i], newTable.getColumns()[i])) { + return false; + } + } + return true; + } + + public CheckResult allowReload(TableDesc newTableDesc) { + final String fullTableName = newTableDesc.getIdentity(); + + TableDesc existing = metadataManager.getTableDesc(fullTableName); + if (existing == null) { + return CheckResult.validOnFirstLoad(fullTableName); + } + + List<String> issues = Lists.newArrayList(); + for (CubeInstance cube : findCubeByTable(fullTableName)) { + String modelName = cube.getModel().getName(); + + // if user reloads a fact table used by cube, then all used columns must match current schema + if (cube.getModel().isFactTable(fullTableName)) { + TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); + List<String> violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc); + if (!violateColumns.isEmpty()) { + issues.add(format("Column %s used in cube[%s] and model[%s], but changed in hive", violateColumns, cube.getName(), modelName)); + } + } + + // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns + // must be the same (except compatible type changes) + if (cube.getModel().isLookupTable(fullTableName)) { + TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); + if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { + issues.add(format("Table '%s' is used as Lookup Table in cube[%s] and model[%s], but changed in hive", lookupTable.getIdentity(), cube.getName(), modelName)); + } + } + } + + if (issues.isEmpty()) { + return CheckResult.validOnCompatibleSchema(fullTableName); + } + return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues); + } +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 9f9b541..919dad4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -25,12 +25,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -44,11 +45,8 @@ import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.response.TableDescResponse; -import org.apache.kylin.source.hive.HiveClientFactory; -import org.apache.kylin.source.hive.HiveSourceTableLoader; -import org.apache.kylin.source.hive.IHiveClient; -import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; -import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +55,11 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; +import com.google.common.base.Preconditions; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; + @Component("tableService") public class TableService extends BasicService { @@ -90,17 +93,76 @@ public class TableService extends BasicService { } public TableDesc getTableDescByName(String tableName, boolean withExt) { - TableDesc table = getMetadataManager().getTableDesc(tableName); - if(withExt){ + TableDesc table = getMetadataManager().getTableDesc(tableName); + if (withExt) { table = cloneTableDesc(table); } return table; } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) - public String[] loadHiveTablesToProject(String[] tables, String project) throws IOException { - Set<String> loaded = HiveSourceTableLoader.loadHiveTables(tables, getConfig()); - String[] result = (String[]) loaded.toArray(new String[loaded.size()]); + public String[] loadHiveTablesToProject(String[] tables, String project) throws Exception { + // de-dup + SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); + for (String fullTableName : tables) { + String[] parts = HadoopUtil.parseHiveTableName(fullTableName); + db2tables.put(parts[0].toUpperCase(), parts[1].toUpperCase()); + } + + // load all tables first + List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList(); + ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + for (Map.Entry<String, String> entry : db2tables.entries()) { + Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue()); + TableDesc tableDesc = pair.getFirst(); + Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey())); + Preconditions.checkState(tableDesc.getName().equals(entry.getValue())); + Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey() + "." + entry.getValue())); + TableExtDesc extDesc = pair.getSecond(); + Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getName())); + allMeta.add(pair); + } + + // do schema check + MetadataManager metaMgr = MetadataManager.getInstance(getConfig()); + CubeManager cubeMgr = CubeManager.getInstance(getConfig()); + TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr); + for (Pair<TableDesc, TableExtDesc> pair : allMeta) { + TableDesc tableDesc = pair.getFirst(); + TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc); + result.raiseExceptionWhenInvalid(); + } + + // save table meta + List<String> saved = Lists.newArrayList(); + for (Pair<TableDesc, TableExtDesc> pair : allMeta) { + TableDesc tableDesc = pair.getFirst(); + TableExtDesc extDesc = pair.getSecond(); + + TableDesc origTable = metaMgr.getTableDesc(tableDesc.getIdentity()); + if (origTable == null) { + tableDesc.setUuid(UUID.randomUUID().toString()); + tableDesc.setLastModified(0); + } else { + tableDesc.setUuid(origTable.getUuid()); + tableDesc.setLastModified(origTable.getLastModified()); + } + + TableExtDesc origExt = metaMgr.getTableExt(tableDesc.getIdentity()); + if (origExt == null) { + extDesc.setUuid(UUID.randomUUID().toString()); + extDesc.setLastModified(0); + } else { + extDesc.setUuid(origExt.getUuid()); + extDesc.setLastModified(origExt.getLastModified()); + } + + metaMgr.saveTableExt(extDesc); + metaMgr.saveSourceTable(tableDesc); + saved.add(tableDesc.getIdentity()); + } + + String[] result = (String[]) saved.toArray(new String[saved.size()]); syncTableToProject(result, project); return result; } @@ -197,9 +259,8 @@ public class TableService extends BasicService { * @throws Exception */ public List<String> getHiveDbNames() throws Exception { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - List<String> results = hiveClient.getHiveDbNames(); - return results; + ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + return explr.listDatabases(); } /** @@ -209,9 +270,8 @@ public class TableService extends BasicService { * @throws Exception */ public List<String> getHiveTableNames(String database) throws Exception { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - List<String> results = hiveClient.getHiveTableNames(database); - return results; + ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer(); + return explr.listTables(database); } private TableDescResponse cloneTableDesc(TableDesc table) { @@ -241,7 +301,6 @@ public class TableService extends BasicService { return rtableDesc; } - private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException { List<TableDesc> descs = new ArrayList<TableDesc>(); Iterator<TableDesc> it = tables.iterator(); @@ -255,7 +314,7 @@ public class TableService extends BasicService { } @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { + public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws Exception { MetadataManager metaMgr = getMetadataManager(); ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig()); for (String table : tables) { @@ -274,7 +333,7 @@ public class TableService extends BasicService { * @param tableName */ @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinality(String tableName, String submitter) throws IOException { + public void calculateCardinality(String tableName, String submitter) throws Exception { tableName = normalizeHiveTableName(tableName); TableDesc table = getMetadataManager().getTableDesc(tableName); final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName); @@ -295,7 +354,7 @@ public class TableService extends BasicService { MapReduceExecutable step1 = new MapReduceExecutable(); - step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); + step1.setMapReduceJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob.class); step1.setMapReduceParams(param); step1.setParam("segmentId", tableName); @@ -303,7 +362,7 @@ public class TableService extends BasicService { HadoopShellExecutable step2 = new HadoopShellExecutable(); - step2.setJobClass(HiveColumnCardinalityUpdateJob.class); + step2.setJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob.class); step2.setJobParams(param); step2.setParam("segmentId", tableName); job.addTask(step2); @@ -313,7 +372,7 @@ public class TableService extends BasicService { getExecutableManager().addJob(job); } - public String normalizeHiveTableName(String tableName){ + public String normalizeHiveTableName(String tableName) { String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); return (dbTableName[0] + "." + dbTableName[1]).toUpperCase(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java index 5c3eeb3..33285bd 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java @@ -38,8 +38,6 @@ import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; -import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; -import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +74,7 @@ public class TableServiceV2 extends TableService { private KafkaConfigService kafkaConfigService; @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { + public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws Exception { MetadataManager metaMgr = getMetadataManager(); ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig()); for (String table : tables) { @@ -95,7 +93,7 @@ public class TableServiceV2 extends TableService { * @param tableName */ @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinality(String tableName, String submitter) throws IOException { + public void calculateCardinality(String tableName, String submitter) throws Exception { Message msg = MsgPicker.getMsg(); tableName = normalizeHiveTableName(tableName); @@ -118,7 +116,7 @@ public class TableServiceV2 extends TableService { MapReduceExecutable step1 = new MapReduceExecutable(); - step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); + step1.setMapReduceJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob.class); step1.setMapReduceParams(param); step1.setParam("segmentId", tableName); @@ -126,7 +124,7 @@ public class TableServiceV2 extends TableService { HadoopShellExecutable step2 = new HadoopShellExecutable(); - step2.setJobClass(HiveColumnCardinalityUpdateJob.class); + step2.setJobClass(org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob.class); step2.setJobParams(param); step2.setParam("segmentId", tableName); job.addTask(step2); @@ -187,7 +185,7 @@ public class TableServiceV2 extends TableService { return rtn; } - public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws IOException { + public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws Exception { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); Map<String, String[]> result = new HashMap<String, String[]>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java new file mode 100644 index 0000000..0f7152b --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.ISourceMetadataExplorer; + +public class HiveMetadataExplorer implements ISourceMetadataExplorer { + + @Override + public List<String> listDatabases() throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + return hiveClient.getHiveDbNames(); + } + + @Override + public List<String> listTables(String database) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + return hiveClient.getHiveTableNames(database); + } + + @Override + public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String tableName) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + MetadataManager metaMgr = MetadataManager.getInstance(config); + + HiveTableMeta hiveTableMeta; + try { + hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName); + } catch (Exception e) { + throw new RuntimeException("cannot get HiveTableMeta", e); + } + + TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName); + if (tableDesc == null) { + tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(tableName.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + tableDesc.setLastModified(0); + } + if (hiveTableMeta.tableType != null) { + tableDesc.setTableType(hiveTableMeta.tableType); + } + + int columnNumber = hiveTableMeta.allColumns.size(); + List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber); + for (int i = 0; i < columnNumber; i++) { + HiveTableMeta.HiveTableColumnMeta field = hiveTableMeta.allColumns.get(i); + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(field.name.toUpperCase()); + // use "double" in kylin for "float" + if ("float".equalsIgnoreCase(field.dataType)) { + cdesc.setDatatype("double"); + } else { + cdesc.setDatatype(field.dataType); + } + cdesc.setId(String.valueOf(i + 1)); + cdesc.setComment(field.comment); + columns.add(cdesc); + } + tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber])); + + StringBuffer partitionColumnString = new StringBuffer(); + for (int i = 0, n = hiveTableMeta.partitionColumns.size(); i < n; i++) { + if (i > 0) + partitionColumnString.append(", "); + partitionColumnString.append(hiveTableMeta.partitionColumns.get(i).name.toUpperCase()); + } + + TableExtDesc tableExtDesc = metaMgr.getTableExt(tableDesc.getIdentity()); + tableExtDesc.addDataSourceProp("location", hiveTableMeta.sdLocation); + tableExtDesc.addDataSourceProp("owner", hiveTableMeta.owner); + tableExtDesc.addDataSourceProp("last_access_time", String.valueOf(hiveTableMeta.lastAccessTime)); + tableExtDesc.addDataSourceProp("partition_column", partitionColumnString.toString()); + tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(hiveTableMeta.fileSize)); + tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(hiveTableMeta.fileNum)); + tableExtDesc.addDataSourceProp("hive_inputFormat", hiveTableMeta.sdInputFormat); + tableExtDesc.addDataSourceProp("hive_outputFormat", hiveTableMeta.sdOutputFormat); + tableExtDesc.addDataSourceProp("skip_header_line_count", String.valueOf(hiveTableMeta.skipHeaderLineCount)); + + return Pair.newPair(tableDesc, tableExtDesc); + } + + @Override + public List<String> getRelatedKylinResources(TableDesc table) { + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index af0a519..77c8582 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -18,22 +18,22 @@ package org.apache.kylin.source.hive; -import java.util.List; - -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; import org.apache.kylin.source.ISource; -import org.apache.kylin.source.ReadableTable; - -import com.google.common.collect.Lists; +import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; //used by reflection public class HiveSource implements ISource { + @Override + public ISourceMetadataExplorer getSourceMetadataExplorer() { + return new HiveMetadataExplorer(); + } + @SuppressWarnings("unchecked") @Override public <I> I adaptToBuildEngine(Class<I> engineInterface) { @@ -45,33 +45,13 @@ public class HiveSource implements ISource { } @Override - public ReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc) { return new HiveTable(tableDesc); } @Override - public List<String> getMRDependentResources(TableDesc table) { - return Lists.newArrayList(); - } - - @Override - public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { + public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { SourcePartition result = SourcePartition.getCopyOf(srcPartition); - CubeInstance cube = (CubeInstance) buildable; - if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == true) { - // normal partitioned cube - if (result.getStartDate() == 0) { - final CubeSegment last = cube.getLastSegment(); - if (last != null) { - result.setStartDate(last.getDateRangeEnd()); - } - } - } else { - // full build - result.setStartDate(0); - result.setEndDate(Long.MAX_VALUE); - } - result.setStartOffset(0); result.setEndOffset(0); return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java deleted file mode 100644 index 87edfe4..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableExtDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; - -/** - * Management class to sync hive table metadata with command See main method for - * how to use the class - * - * @author jianliu - */ -public class HiveSourceTableLoader { - - @SuppressWarnings("unused") - private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class); - - public static Set<String> loadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { - - SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); - for (String fullTableName : hiveTables) { - String[] parts = HadoopUtil.parseHiveTableName(fullTableName); - db2tables.put(parts[0], parts[1]); - } - - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config)); - for (Map.Entry<String, String> entry : db2tables.entries()) { - SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue()); - result.raiseExceptionWhenInvalid(); - } - - // extract from hive - Set<String> loadedTables = Sets.newHashSet(); - for (String database : db2tables.keySet()) { - List<String> loaded = extractHiveTables(database, db2tables.get(database), hiveClient); - loadedTables.addAll(loaded); - } - - return loadedTables; - } - - private static List<String> extractHiveTables(String database, Set<String> tables, IHiveClient hiveClient) throws IOException { - - List<String> loadedTables = Lists.newArrayList(); - MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - for (String tableName : tables) { - HiveTableMeta hiveTableMeta; - try { - hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName); - } catch (Exception e) { - throw new RuntimeException("cannot get HiveTableMeta", e); - } - - TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName); - if (tableDesc == null) { - tableDesc = new TableDesc(); - tableDesc.setDatabase(database.toUpperCase()); - tableDesc.setName(tableName.toUpperCase()); - tableDesc.setUuid(UUID.randomUUID().toString()); - tableDesc.setLastModified(0); - } - if (hiveTableMeta.tableType != null) { - tableDesc.setTableType(hiveTableMeta.tableType); - } - - int columnNumber = hiveTableMeta.allColumns.size(); - List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber); - for (int i = 0; i < columnNumber; i++) { - HiveTableMeta.HiveTableColumnMeta field = hiveTableMeta.allColumns.get(i); - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(field.name.toUpperCase()); - // use "double" in kylin for "float" - if ("float".equalsIgnoreCase(field.dataType)) { - cdesc.setDatatype("double"); - } else { - cdesc.setDatatype(field.dataType); - } - cdesc.setId(String.valueOf(i + 1)); - cdesc.setComment(field.comment); - columns.add(cdesc); - } - tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber])); - - StringBuffer partitionColumnString = new StringBuffer(); - for (int i = 0, n = hiveTableMeta.partitionColumns.size(); i < n; i++) { - if (i > 0) - partitionColumnString.append(", "); - partitionColumnString.append(hiveTableMeta.partitionColumns.get(i).name.toUpperCase()); - } - - TableExtDesc tableExtDesc = metaMgr.getTableExt(tableDesc.getIdentity()); - tableExtDesc.addDataSourceProp("location", hiveTableMeta.sdLocation); - tableExtDesc.addDataSourceProp("owner", hiveTableMeta.owner); - tableExtDesc.addDataSourceProp("last_access_time", String.valueOf(hiveTableMeta.lastAccessTime)); - tableExtDesc.addDataSourceProp("partition_column", partitionColumnString.toString()); - tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(hiveTableMeta.fileSize)); - tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(hiveTableMeta.fileNum)); - tableExtDesc.addDataSourceProp("hive_inputFormat", hiveTableMeta.sdInputFormat); - tableExtDesc.addDataSourceProp("hive_outputFormat", hiveTableMeta.sdOutputFormat); - tableExtDesc.addDataSourceProp("skip_header_line_count", String.valueOf(hiveTableMeta.skipHeaderLineCount)); - - metaMgr.saveTableExt(tableExtDesc); - metaMgr.saveSourceTable(tableDesc); - - loadedTables.add(tableDesc.getIdentity()); - } - return loadedTables; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java index 83e49e9..14ed1f9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java @@ -25,13 +25,13 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.mr.DFSFileTable; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** */ -public class HiveTable implements ReadableTable { +public class HiveTable implements IReadableTable { private static final Logger logger = LoggerFactory.getLogger(HiveTable.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java index 8309a8c..75f322f 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java @@ -33,7 +33,7 @@ import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; import org.apache.hive.hcatalog.data.transfer.HCatReader; import org.apache.hive.hcatalog.data.transfer.ReadEntity; import org.apache.hive.hcatalog.data.transfer.ReaderContext; -import org.apache.kylin.source.ReadableTable.TableReader; +import org.apache.kylin.source.IReadableTable.TableReader; /** * An implementation of TableReader with HCatalog for Hive table. http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java deleted file mode 100644 index dbcfc7a..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import static com.google.common.base.Preconditions.checkNotNull; -import static java.lang.String.format; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class SchemaChecker { - private final IHiveClient hiveClient; - private final MetadataManager metadataManager; - private final CubeManager cubeManager; - - static class CheckResult { - private final boolean valid; - private final String reason; - - private CheckResult(boolean valid, String reason) { - this.valid = valid; - this.reason = reason; - } - - void raiseExceptionWhenInvalid() { - if (!valid) { - throw new RuntimeException(reason); - } - } - - static CheckResult validOnFirstLoad(String tableName) { - return new CheckResult(true, format("Table '%s' hasn't been loaded before", tableName)); - } - - static CheckResult validOnCompatibleSchema(String tableName) { - return new CheckResult(true, format("Table '%s' is compatible with all existing cubes", tableName)); - } - - static CheckResult invalidOnFetchSchema(String tableName, Exception e) { - return new CheckResult(false, format("Failed to fetch metadata of '%s': %s", tableName, e.getMessage())); - } - - static CheckResult invalidOnIncompatibleSchema(String tableName, List<String> reasons) { - StringBuilder buf = new StringBuilder(); - for (String reason : reasons) { - buf.append("- ").append(reason).append("\n"); - } - - return new CheckResult(false, format("Found %d issue(s) with '%s':%n%s Please disable and purge related cube(s) first", reasons.size(), tableName, buf.toString())); - } - } - - SchemaChecker(IHiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) { - this.hiveClient = checkNotNull(hiveClient, "hiveClient is null"); - this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); - this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); - } - - private List<HiveTableMeta.HiveTableColumnMeta> fetchSchema(String dbName, String tblName) throws Exception { - List<HiveTableMeta.HiveTableColumnMeta> columnMetas = Lists.newArrayList(); - columnMetas.addAll(hiveClient.getHiveTableMeta(dbName, tblName).allColumns); - return columnMetas; - } - - private List<CubeInstance> findCubeByTable(final String fullTableName) { - Iterable<CubeInstance> relatedCubes = Iterables.filter(cubeManager.listAllCubes(), new Predicate<CubeInstance>() { - @Override - public boolean apply(@Nullable CubeInstance cube) { - if (cube == null || cube.allowBrokenDescriptor()) { - return false; - } - DataModelDesc model = cube.getModel(); - if (model == null) - return false; - return model.containsTable(fullTableName); - } - }); - - return ImmutableList.copyOf(relatedCubes); - } - - private boolean isColumnCompatible(ColumnDesc column, HiveTableMeta.HiveTableColumnMeta field) { - if (!column.getName().equalsIgnoreCase(field.name)) { - return false; - } - - String typeStr = field.dataType; - // kylin uses double internally for float, see HiveSourceTableLoader.java - // TODO should this normalization to be in DataType class ? - if ("float".equalsIgnoreCase(typeStr)) { - typeStr = "double"; - } - DataType fieldType = DataType.getType(typeStr); - - if (column.getType().isIntegerFamily()) { - // OLAPTable.listSourceColumns converts some integer columns to bigint, - // therefore strict type comparison won't work. - // changing from one integer type to another should be fine. - return fieldType.isIntegerFamily(); - } else { - // only compare base type name, changing precision or scale should be fine - return column.getTypeName().equals(fieldType.getName()); - } - } - - /** - * check whether all columns used in `cube` has compatible schema in current hive schema denoted by `fieldsMap`. - * @param cube cube to check, must use `table` in its model - * @param table kylin's table metadata - * @param fieldsMap current hive schema of `table` - * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise - */ - private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, HiveTableMeta.HiveTableColumnMeta> fieldsMap) { - Set<ColumnDesc> usedColumns = Sets.newHashSet(); - for (TblColRef col : cube.getAllColumns()) { - usedColumns.add(col.getColumnDesc()); - } - - List<String> violateColumns = Lists.newArrayList(); - for (ColumnDesc column : table.getColumns()) { - if (!column.isComputedColumnn() && usedColumns.contains(column)) { - HiveTableMeta.HiveTableColumnMeta field = fieldsMap.get(column.getName()); - if (field == null || !isColumnCompatible(column, field)) { - violateColumns.add(column.getName()); - } - } - } - return violateColumns; - } - - /** - * check whether all columns in `table` are still in `fields` and have the same index as before. - * - * @param table kylin's table metadata - * @param fields current table metadata in hive - * @return true if only new columns are appended in hive, false otherwise - */ - private boolean checkAllColumnsInTableDesc(TableDesc table, List<HiveTableMeta.HiveTableColumnMeta> fields) { - if (table.getColumnCount() > fields.size()) { - return false; - } - - ColumnDesc[] columns = table.getColumns(); - for (int i = 0; i < columns.length; i++) { - if (!isColumnCompatible(columns[i], fields.get(i))) { - return false; - } - } - return true; - } - - public CheckResult allowReload(String dbName, String tblName) { - final String fullTableName = (dbName + "." + tblName).toUpperCase(); - - TableDesc existing = metadataManager.getTableDesc(fullTableName); - if (existing == null) { - return CheckResult.validOnFirstLoad(fullTableName); - } - - List<HiveTableMeta.HiveTableColumnMeta> currentFields; - Map<String, HiveTableMeta.HiveTableColumnMeta> currentFieldsMap = Maps.newHashMap(); - try { - currentFields = fetchSchema(dbName, tblName); - } catch (Exception e) { - return CheckResult.invalidOnFetchSchema(fullTableName, e); - } - for (HiveTableMeta.HiveTableColumnMeta field : currentFields) { - currentFieldsMap.put(field.name.toUpperCase(), field); - } - - List<String> issues = Lists.newArrayList(); - for (CubeInstance cube : findCubeByTable(fullTableName)) { - String modelName = cube.getModel().getName(); - - // if user reloads a fact table used by cube, then all used columns must match current schema - if (cube.getModel().isFactTable(fullTableName)) { - TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); - List<String> violateColumns = checkAllColumnsInCube(cube, factTable, currentFieldsMap); - if (!violateColumns.isEmpty()) { - issues.add(format("Column %s used in cube[%s] and model[%s], but changed in hive", violateColumns, cube.getName(), modelName)); - } - } - - // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns - // must be the same (except compatible type changes) - if (cube.getModel().isLookupTable(fullTableName)) { - TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); - if (!checkAllColumnsInTableDesc(lookupTable, currentFields)) { - issues.add(format("Table '%s' is used as Lookup Table in cube[%s] and model[%s], but changed in hive", lookupTable.getIdentity(), cube.getName(), modelName)); - } - } - } - - if (issues.isEmpty()) { - return CheckResult.validOnCompatibleSchema(fullTableName); - } - return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a0edfef/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 6689c6e..52d2e6f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -24,14 +24,17 @@ import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.source.ISource; -import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaClient; @@ -56,20 +59,12 @@ public class KafkaSource implements ISource { } @Override - public ReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc) { throw new UnsupportedOperationException(); } @Override - public List<String> getMRDependentResources(TableDesc table) { - List<String> dependentResources = Lists.newArrayList(); - dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity())); - dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); - return dependentResources; - } - - @Override - public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { + public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { checkSourceOffsets(srcPartition); final SourcePartition result = SourcePartition.getCopyOf(srcPartition); final CubeInstance cube = (CubeInstance) buildable; @@ -185,4 +180,33 @@ public class KafkaSource implements ISource { } } + @Override + public ISourceMetadataExplorer getSourceMetadataExplorer() { + return new ISourceMetadataExplorer() { + + @Override + public List<String> listDatabases() throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> listTables(String database) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> getRelatedKylinResources(TableDesc table) { + List<String> dependentResources = Lists.newArrayList(); + dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity())); + dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); + return dependentResources; + } + }; + } + }