This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d502c52ae3198c0bad39bde9dd3e244b67b95520
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Tue May 19 12:02:47 2020 +0800

    KYLIN-4489 Add hive table compatibility check rest service
---
 .../apache/kylin/common/restclient/RestClient.java |  7 +++
 .../kylin/rest/controller/MigrationController.java | 19 ++++++++
 .../rest/service/TableSchemaUpdateChecker.java     | 56 ++++++++++++++++++++--
 .../apache/kylin/rest/service/TableService.java    | 22 +++++++++
 server/src/main/resources/kylinSecurity.xml        |  2 +
 5 files changed, 103 insertions(+), 3 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index a9971dd..d908f58 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -353,6 +353,13 @@ public class RestClient {
     }
 
     public void checkCompatibility(String jsonRequest) throws IOException {
+        checkCompatibility(jsonRequest, false);
+    }
+
+    public void checkCompatibility(String jsonRequest, boolean ifHiveCheck) 
throws IOException {
+        if (ifHiveCheck) {
+            checkCompatibility(jsonRequest, baseUrl + 
"/cubes/checkCompatibility/hiveTable");
+        }
         checkCompatibility(jsonRequest, baseUrl + "/cubes/checkCompatibility");
     }
 
diff --git 
a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
 
b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
index 106d51f..efef5cf 100644
--- 
a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
+++ 
b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
@@ -163,6 +163,25 @@ public class MigrationController extends BasicController {
         }
     }
 
+    /**
+     * Check the schema compatibility for table
+     */
+    @RequestMapping(value = "/checkCompatibility/hiveTable", method = { 
RequestMethod.POST })
+    @ResponseBody
+    public void checkHiveTableCompatibility(@RequestBody 
CompatibilityCheckRequest request) {
+        try {
+            List<TableDesc> tableDescList = deserializeTableDescList(request);
+            for (TableDesc tableDesc : tableDescList) {
+                logger.info("Schema compatibility check for table {}", 
tableDesc.getName());
+                
tableService.checkHiveTableCompatibility(request.getProjectName(), tableDesc);
+                logger.info("Pass schema compatibility check for table {}", 
tableDesc.getName());
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new ConflictException(e.getMessage(), e);
+        }
+    }
+
     private List<TableDesc> deserializeTableDescList(CompatibilityCheckRequest 
request) {
         List<TableDesc> result = Lists.newArrayList();
         try {
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
index f03acb8..3a45f49 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java
@@ -23,22 +23,23 @@ import static java.lang.String.format;
 
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.model.ModelDimensionDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.shaded.com.google.common.base.Predicate;
 import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
 import org.apache.kylin.shaded.com.google.common.collect.Iterables;
@@ -340,4 +341,53 @@ public class TableSchemaUpdateChecker {
 
         return usedColumns;
     }
+
+    public CheckResult allowMigrate(TableDesc newTableDesc, TableDesc 
hiveTableDesc) throws Exception {
+        final String fullTableName = newTableDesc.getIdentity();
+
+        List<String> issues = Lists.newArrayList();
+        checkAllColumnsInHiveTableDesc(hiveTableDesc, newTableDesc, issues);
+        if (issues.isEmpty()) {
+            return new CheckResult(true,
+                    format(Locale.ROOT, "Table '%s' is compatible with 
existing hive table", fullTableName));
+        } else {
+            return new CheckResult(false, format(Locale.ROOT,
+                    "Table '%s' is incompatible with existing hive table due 
to '%s'", fullTableName, issues));
+        }
+    }
+
+    private void checkAllColumnsInHiveTableDesc(TableDesc hiveTable, TableDesc 
newTable, List<String> issues) {
+        ColumnDesc[] hiveTableCols = hiveTable.getColumns();
+        ColumnDesc[] newTableCols = newTable.getColumns();
+
+        if (hiveTableCols.length < newTableCols.length) {
+            Set<String> colNamesNew = 
Lists.newArrayList(newTableCols).stream().map(input -> input.getName())
+                    .collect(Collectors.toSet());
+            Set<String> colNamesHive = 
Lists.newArrayList(hiveTableCols).stream().map(input -> input.getName())
+                    .collect(Collectors.toSet());
+            colNamesNew.removeAll(colNamesHive);
+            issues.add(format(Locale.ROOT, "columns %s are not existing in 
hive table", colNamesNew));
+            return;
+        }
+
+        Map<String, ColumnDesc> hiveColMap = 
Lists.newArrayList(hiveTableCols).stream()
+                .collect(Collectors.toMap(input -> 
input.getName().toUpperCase(Locale.ROOT), input -> input));
+        Map<String, ColumnDesc> newColMap = 
Lists.newArrayList(newTableCols).stream()
+                .collect(Collectors.toMap(input -> 
input.getName().toUpperCase(Locale.ROOT), input -> input));
+
+        List<String> violateColumns = Lists.newArrayList();
+        for (String colName : newColMap.keySet()) {
+            ColumnDesc hiveCol = hiveColMap.get(colName);
+            if (hiveCol == null) {
+                issues.add(format(Locale.ROOT, "column %s is not existing in 
hive table", colName));
+                continue;
+            }
+            if (!isColumnCompatible(hiveCol, newColMap.get(colName))) {
+                violateColumns.add(colName);
+            }
+        }
+        if (!violateColumns.isEmpty()) {
+            issues.add(format(Locale.ROOT, "Columns %s are incompatible " + 
"in hive", violateColumns));
+        }
+    }
 }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 764a32a..0420ad8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -125,6 +125,28 @@ public class TableService extends BasicService {
         TableSchemaUpdateChecker.CheckResult result = 
getSchemaUpdateChecker().allowReload(tableDesc, prj);
         result.raiseExceptionWhenInvalid();
     }
+
+    public void checkHiveTableCompatibility(String prj, TableDesc tableDesc) 
throws Exception {
+        Preconditions.checkNotNull(tableDesc.getDatabase());
+        Preconditions.checkNotNull(tableDesc.getName());
+
+        String database = tableDesc.getDatabase().toUpperCase(Locale.ROOT);
+        String tableName = tableDesc.getName().toUpperCase(Locale.ROOT);
+        ProjectInstance projectInstance = getProjectManager().getProject(prj);
+        ISourceMetadataExplorer explr = 
SourceManager.getSource(projectInstance).getSourceMetadataExplorer();
+
+        TableDesc hiveTableDesc;
+        try {
+            Pair<TableDesc, TableExtDesc> pair = 
explr.loadTableMetadata(database, tableName, prj);
+            hiveTableDesc = pair.getFirst();
+        } catch (Exception e) {
+            logger.error("Fail to get metadata for hive table {} due to ", 
tableDesc.getIdentity(), e);
+            throw new RuntimeException("Fail to get metadata for hive table" + 
tableDesc.getIdentity());
+        }
+
+        TableSchemaUpdateChecker.CheckResult result = 
getSchemaUpdateChecker().allowMigrate(tableDesc, hiveTableDesc);
+        result.raiseExceptionWhenInvalid();
+    }
     
     public List<TableDesc> getTableDescByProject(String project, boolean 
withExt) throws IOException {
         aclEvaluate.checkProjectReadPermission(project);
diff --git a/server/src/main/resources/kylinSecurity.xml 
b/server/src/main/resources/kylinSecurity.xml
index baf7172..912f700 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -237,6 +237,7 @@
 
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/cubes/checkCompatibility" 
access="permitAll"/>
+            <scr:intercept-url 
pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/>
             <scr:intercept-url pattern="/api/query/runningQueries" 
access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query/*/stop" 
access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query*/**" 
access="isAuthenticated()"/>
@@ -290,6 +291,7 @@
 
             <scr:intercept-url pattern="/api/user/authentication*/**" 
access="permitAll"/>
             <scr:intercept-url pattern="/api/cubes/checkCompatibility" 
access="permitAll"/>
+            <scr:intercept-url 
pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/>
             <scr:intercept-url pattern="/api/query/runningQueries" 
access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query/*/stop" 
access="hasRole('ROLE_ADMIN')"/>
             <scr:intercept-url pattern="/api/query*/**" 
access="isAuthenticated()"/>

Reply via email to