This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d502c52ae3198c0bad39bde9dd3e244b67b95520 Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Tue May 19 12:02:47 2020 +0800 KYLIN-4489 Add hive table compatibility check rest service --- .../apache/kylin/common/restclient/RestClient.java | 7 +++ .../kylin/rest/controller/MigrationController.java | 19 ++++++++ .../rest/service/TableSchemaUpdateChecker.java | 56 ++++++++++++++++++++-- .../apache/kylin/rest/service/TableService.java | 22 +++++++++ server/src/main/resources/kylinSecurity.xml | 2 + 5 files changed, 103 insertions(+), 3 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index a9971dd..d908f58 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -353,6 +353,13 @@ public class RestClient { } public void checkCompatibility(String jsonRequest) throws IOException { + checkCompatibility(jsonRequest, false); + } + + public void checkCompatibility(String jsonRequest, boolean ifHiveCheck) throws IOException { + if (ifHiveCheck) { + checkCompatibility(jsonRequest, baseUrl + "/cubes/checkCompatibility/hiveTable"); + } checkCompatibility(jsonRequest, baseUrl + "/cubes/checkCompatibility"); } diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java index 106d51f..efef5cf 100644 --- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java +++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java @@ -163,6 +163,25 @@ public class MigrationController extends BasicController { } } + /** + * Check the schema compatibility for table + */ + @RequestMapping(value = "/checkCompatibility/hiveTable", method = { RequestMethod.POST }) + @ResponseBody + public void checkHiveTableCompatibility(@RequestBody CompatibilityCheckRequest request) { + try { + List<TableDesc> tableDescList = deserializeTableDescList(request); + for (TableDesc tableDesc : tableDescList) { + logger.info("Schema compatibility check for table {}", tableDesc.getName()); + tableService.checkHiveTableCompatibility(request.getProjectName(), tableDesc); + logger.info("Pass schema compatibility check for table {}", tableDesc.getName()); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new ConflictException(e.getMessage(), e); + } + } + private List<TableDesc> deserializeTableDescList(CompatibilityCheckRequest request) { List<TableDesc> result = Lists.newArrayList(); try { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java index f03acb8..3a45f49 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java @@ -23,22 +23,23 @@ import static java.lang.String.format; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.model.DataModelManager; import org.apache.kylin.metadata.model.ModelDimensionDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.kylin.shaded.com.google.common.base.Predicate; import org.apache.kylin.shaded.com.google.common.collect.ImmutableList; import org.apache.kylin.shaded.com.google.common.collect.Iterables; @@ -340,4 +341,53 @@ public class TableSchemaUpdateChecker { return usedColumns; } + + public CheckResult allowMigrate(TableDesc newTableDesc, TableDesc hiveTableDesc) throws Exception { + final String fullTableName = newTableDesc.getIdentity(); + + List<String> issues = Lists.newArrayList(); + checkAllColumnsInHiveTableDesc(hiveTableDesc, newTableDesc, issues); + if (issues.isEmpty()) { + return new CheckResult(true, + format(Locale.ROOT, "Table '%s' is compatible with existing hive table", fullTableName)); + } else { + return new CheckResult(false, format(Locale.ROOT, + "Table '%s' is incompatible with existing hive table due to '%s'", fullTableName, issues)); + } + } + + private void checkAllColumnsInHiveTableDesc(TableDesc hiveTable, TableDesc newTable, List<String> issues) { + ColumnDesc[] hiveTableCols = hiveTable.getColumns(); + ColumnDesc[] newTableCols = newTable.getColumns(); + + if (hiveTableCols.length < newTableCols.length) { + Set<String> colNamesNew = Lists.newArrayList(newTableCols).stream().map(input -> input.getName()) + .collect(Collectors.toSet()); + Set<String> colNamesHive = Lists.newArrayList(hiveTableCols).stream().map(input -> input.getName()) + .collect(Collectors.toSet()); + colNamesNew.removeAll(colNamesHive); + issues.add(format(Locale.ROOT, "columns %s are not existing in hive table", colNamesNew)); + return; + } + + Map<String, ColumnDesc> hiveColMap = Lists.newArrayList(hiveTableCols).stream() + .collect(Collectors.toMap(input -> input.getName().toUpperCase(Locale.ROOT), input -> input)); + Map<String, ColumnDesc> newColMap = Lists.newArrayList(newTableCols).stream() + .collect(Collectors.toMap(input -> input.getName().toUpperCase(Locale.ROOT), input -> input)); + + List<String> violateColumns = Lists.newArrayList(); + for (String colName : newColMap.keySet()) { + ColumnDesc hiveCol = hiveColMap.get(colName); + if (hiveCol == null) { + issues.add(format(Locale.ROOT, "column %s is not existing in hive table", colName)); + continue; + } + if (!isColumnCompatible(hiveCol, newColMap.get(colName))) { + violateColumns.add(colName); + } + } + if (!violateColumns.isEmpty()) { + issues.add(format(Locale.ROOT, "Columns %s are incompatible " + "in hive", violateColumns)); + } + } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 764a32a..0420ad8 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -125,6 +125,28 @@ public class TableService extends BasicService { TableSchemaUpdateChecker.CheckResult result = getSchemaUpdateChecker().allowReload(tableDesc, prj); result.raiseExceptionWhenInvalid(); } + + public void checkHiveTableCompatibility(String prj, TableDesc tableDesc) throws Exception { + Preconditions.checkNotNull(tableDesc.getDatabase()); + Preconditions.checkNotNull(tableDesc.getName()); + + String database = tableDesc.getDatabase().toUpperCase(Locale.ROOT); + String tableName = tableDesc.getName().toUpperCase(Locale.ROOT); + ProjectInstance projectInstance = getProjectManager().getProject(prj); + ISourceMetadataExplorer explr = SourceManager.getSource(projectInstance).getSourceMetadataExplorer(); + + TableDesc hiveTableDesc; + try { + Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(database, tableName, prj); + hiveTableDesc = pair.getFirst(); + } catch (Exception e) { + logger.error("Fail to get metadata for hive table {} due to ", tableDesc.getIdentity(), e); + throw new RuntimeException("Fail to get metadata for hive table" + tableDesc.getIdentity()); + } + + TableSchemaUpdateChecker.CheckResult result = getSchemaUpdateChecker().allowMigrate(tableDesc, hiveTableDesc); + result.raiseExceptionWhenInvalid(); + } public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException { aclEvaluate.checkProjectReadPermission(project); diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml index baf7172..912f700 100644 --- a/server/src/main/resources/kylinSecurity.xml +++ b/server/src/main/resources/kylinSecurity.xml @@ -237,6 +237,7 @@ <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/> + <scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/> <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/> @@ -290,6 +291,7 @@ <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/> + <scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/> <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/>