This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit db8cb0e9022911c6a907a584b434c9102b51c265 Author: ChenLiang <31469905+yab...@users.noreply.github.com> AuthorDate: Sat Oct 29 19:40:21 2022 +0800 KYLIN-5374 support DDL on view KYLIN-5374 support DDL on view --- pom.xml | 5 + .../kylin/rest/controller/NAdminController.java | 1 + src/common-service/pom.xml | 4 + .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/common/exception/ServerErrorCode.java | 5 +- .../org/apache/kylin/common/msg/CnMessage.java | 35 ++++++ .../java/org/apache/kylin/common/msg/Message.java | 37 ++++++ .../resources/kylin_errorcode_conf_en.properties | 2 + .../resources/kylin_errorcode_conf_zh.properties | 2 + src/datasource-service/pom.xml | 13 +- .../apache/kylin/rest/request/ViewDDLRequest.java | 31 +++++ .../apache/kylin/rest/service/SparkDDLService.java | 75 +++++++++++ .../apache/kylin/rest/service/SparkDDLTest.java | 137 +++++++++++++++++++++ .../spark/sql/common/SparkDDLTestUtils.scala | 137 +++++++++++++++++++++ .../kylin/rest/controller/SparkDDLController.java | 68 ++++++++++ .../rest/controller/SparkDDLControllerTest.java | 89 +++++++++++++ .../spark/source/NSparkMetadataExplorer.java | 37 ++++++ src/spark-project/spark-ddl-plugin/pom.xml | 58 +++++++++ .../java/org/apache/kylin/spark/ddl/DDLCheck.java | 35 ++++++ .../apache/kylin/spark/ddl/DDLCheckContext.java | 50 ++++++++ .../apache/kylin/spark/ddl/SourceTableCheck.java | 76 ++++++++++++ .../services/org.apache.kylin.spark.ddl.DDLCheck | 2 + .../org/apache/kylin/spark/ddl/ViewCheck.scala | 123 ++++++++++++++++++ 23 files changed, 1024 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 5c1ba4eecc..896608f332 100644 --- a/pom.xml +++ b/pom.xml @@ -652,6 +652,11 @@ <artifactId>kylin-soft-affinity-cache</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-spark-ddl</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> diff --git a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java index 802b044449..d134bf04b9 100644 --- a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java +++ b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java @@ -74,6 +74,7 @@ public class NAdminController extends NBasicController { propertyKeys.add("kylin.streaming.enabled"); propertyKeys.add("kylin.model.measure-name-check-enabled"); propertyKeys.add("kylin.security.remove-ldap-custom-security-limit-enabled"); + propertyKeys.add("kylin.source.ddl.enabled"); // add second storage if (StringUtils.isNotEmpty(KylinConfig.getInstanceFromEnv().getSecondStorage())) { diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml index 10acf44d80..40c94752dc 100644 --- a/src/common-service/pom.xml +++ b/src/common-service/pom.xml @@ -50,6 +50,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-tool</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-spark-ddl</artifactId> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e8d777d013..9c4601705f 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3661,4 +3661,8 @@ public abstract class KylinConfigBase implements Serializable { public int getSecondStorageWaitLockTimeout() { return Integer.parseInt(getOptional("kylin.second-storage.wait-lock-timeout", "180")); } + + public boolean getDDLEnabled(){ + return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE)); + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java index 856a5da4db..1e6c9722bf 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java @@ -285,7 +285,10 @@ public enum ServerErrorCode implements ErrorCodeSupplier { INVALID_JDBC_SOURCE_CONFIG("KE-010039001"), // // 10040XXX cache - REDIS_CLEAR_ERROR("KE-010040001"); // + REDIS_CLEAR_ERROR("KE-010040001"), // + + // 10050XXX SQL DDL + DDL_CHECK_ERROR("KE-010050001"); private final ErrorCode errorCode; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java index 3688c2db26..3306e85266 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java @@ -1754,4 +1754,39 @@ public class CnMessage extends Message { public String getSecondStorageNodeNotAvailable(String nodeName) { return String.format(Locale.ROOT, "分层存储节点'%s'不可用。", nodeName); } + + @Override + public String getDDLUnSupported() { + return "不支持的 DDL 语法,仅支持 `create view`, `drop view`, `alter view`, `show create view` 语法"; + } + + @Override + public String getDDLViewNameError() { + return "视图名需要以 KE_ 开头"; + } + + @Override + public String getDDLDropError() { + return "仅支持删除 view 类型表且 view 名称需要以 KE_ 开头"; + } + + @Override + public String getDDLTableNotLoad(String table) { + return String.format(Locale.ROOT, "'%s' 没有加载到数据源", table); + } + + @Override + public String getDDLTableNotSupport(String table) { + return String.format(Locale.ROOT, "仅支持 hive 数据表,但 '%s' 不是 hive 表", table); + } + + @Override + public String getDDLPermissionDenied() { + return "只有系统或者项目管理员可以进行 DDL 操作"; + } + + @Override + public String getDDLDatabaseAccessnDenied() { + return "用户没有视图所在数据库的权限"; + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java index d64b7179c1..365474db28 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java @@ -60,6 +60,15 @@ public class Message { private static final String LICENSE_MISMATCH_LICENSE = "The license doesn’t match the current cluster information. Please upload a new license, or contact Kyligence."; private static final String LICENSE_NOT_EFFECTIVE = "License is not effective yet, please apply for a new license."; private static final String LICENSE_EXPIRED = "The license has expired. Please upload a new license, or contact Kyligence."; + private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop " + + "view`, `alter view`, `show create table`"; + private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KE_"; + private static final String DDL_DROP_ERROR = "Only support drop view"; + private static final String DDL_TABLE_NOT_LOADED = "Table '%s' is not loaded into the data source "; + private static final String DDL_TABLE_NOT_SUPPORT = "Only support hive table, but '%s' is not hive table"; + private static final String DDL_PERMISSION_DENIED = "Only Administrator or Project Administrator can do DDL operations"; + private static final String DDL_DATABASE_ACCESSN_DENIED = "The user does not have the database permission to " + + "which the view belongs."; protected Message() { @@ -1582,4 +1591,32 @@ public class Message { public String getSecondStorageNodeNotAvailable(String nodeName) { return String.format(Locale.ROOT, "Tiered storage node '%s' not available.", nodeName); } + + public String getDDLUnSupported() { + return DDL_UNSUPPORTED; + } + + public String getDDLViewNameError() { + return DDL_VIEW_NAME_ERROR; + } + + public String getDDLDropError() { + return DDL_DROP_ERROR; + } + + public String getDDLTableNotLoad(String table) { + return String.format(Locale.ROOT, DDL_TABLE_NOT_LOADED, table); + } + + public String getDDLTableNotSupport(String table) { + return String.format(Locale.ROOT, DDL_TABLE_NOT_SUPPORT, table); + } + + public String getDDLPermissionDenied() { + return DDL_PERMISSION_DENIED; + } + + public String getDDLDatabaseAccessnDenied() { + return DDL_DATABASE_ACCESSN_DENIED; + } } diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties index 47ebee5f8b..ad67f4540f 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties @@ -273,3 +273,5 @@ KE-010039001=Invalid Connection Info # cache KE-010040001=Clear Redis Cache Failure +# SQL DDL +KE-010050001=DDL Operation Failure \ No newline at end of file diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties index 0e43ba0c5a..ef6f47a33e 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties @@ -272,3 +272,5 @@ KE-010039001=连接信息有误 # cache KE-010040001=清空Redis缓存失败 +# SQL DDL +KE-010050001=DDL操作失败 \ No newline at end of file diff --git a/src/datasource-service/pom.xml b/src/datasource-service/pom.xml index ec0d9cc85f..69e2d8427b 100644 --- a/src/datasource-service/pom.xml +++ b/src/datasource-service/pom.xml @@ -131,5 +131,16 @@ <scope>test</scope> </dependency> </dependencies> - + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> </project> diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java new file mode 100644 index 0000000000..37c29aa835 --- /dev/null +++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.request; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; + +@Data +public class ViewDDLRequest { + @JsonProperty("sql") + private String sql; + @JsonProperty("project") + private String project; +} diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java new file mode 100644 index 0000000000..7bce47ad22 --- /dev/null +++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.service; + +import static org.apache.kylin.common.exception.ServerErrorCode.DDL_CHECK_ERROR; + +import java.util.Arrays; +import java.util.List; +import java.util.ServiceLoader; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.rest.util.AclPermissionUtil; +import org.apache.kylin.spark.ddl.DDLCheck; +import org.apache.kylin.spark.ddl.DDLCheckContext; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparderEnv; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Lists; + +import lombok.val; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +public class SparkDDLService extends BasicService { + + private final ServiceLoader<DDLCheck> ddlChecks = ServiceLoader.load(DDLCheck.class); + + public String executeDDLSql(String project, String sql) { + if (!KylinConfig.getInstanceFromEnv().getDDLEnabled()) { + throw new KylinException(DDL_CHECK_ERROR, "DDL function has not been turned on."); + } + val groups = getCurrentUserGroups(); + val context = new DDLCheckContext(sql, project, AclPermissionUtil.getCurrentUsername(), + groups); + for (DDLCheck checker : ddlChecks) { + checker.check(context); + } + final StringBuilder result = new StringBuilder(); + List<Row> rows = SparderEnv.getSparkSession().sql(sql).collectAsList(); + rows.forEach(row -> result.append(row.get(0).toString() + "\n")); + return result.toString(); + } + + public List<List<String>> pluginsDescription(String project) { + if (!KylinConfig.getInstanceFromEnv().getDDLEnabled()) { + throw new KylinException(DDL_CHECK_ERROR, "DDL function has not been turned on."); + } + List<String> descriptionEN = Lists.newArrayList(); + List<String> descriptionCN = Lists.newArrayList(); + for (DDLCheck checker : ddlChecks) { + String[] description = checker.description(project); + descriptionEN.addAll(Arrays.asList(description[0].split("\n"))); + descriptionCN.addAll(Arrays.asList(description[1].split("\n"))); + } + return Lists.newArrayList(descriptionEN, descriptionCN); + } +} \ No newline at end of file diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java new file mode 100644 index 0000000000..df4af60863 --- /dev/null +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.service; + +import java.util.List; + +import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.rest.constant.Constant; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.common.SparkDDLTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.authentication.TestingAuthenticationToken; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.test.util.ReflectionTestUtils; + +public class SparkDDLTest extends NLocalFileMetadataTestCase { + @Autowired + private final SparkDDLService ddlService = Mockito.spy(new SparkDDLService()); + @Autowired + private final IUserGroupService userGroupService = Mockito.spy(NUserGroupService.class); + + private static final String CREATEVIEW_SQL1 = + "CREATE VIEW `ssb`.`ke_order_view` as select LO_ORDERKEY, C_NAME from SSB.P_LINEORDER t1 left join " + + "SSB. CUSTOMER t2 on t1. LO_CUSTKEY = t2. C_CUSTKEY"; + private static final String CREATEVIEW_SQL2 = "CREATE VIEW `ssb`.`order_view2` as select * from SSB.P_LINEORDER"; + private static final String CREATEVIEW_SQL3 = "CREATE VIEW `ssb`.`order_view2` as abc"; + private static final String CREATEVIEW_SQL4 = "CREATE VIEW `ssb`.`order_view2` as select * from SSB.unload_table"; + private static final String CREATEVIEW_SQL5 = "CREATE VIEW `ke_order_view2` as select * from SSB.P_LINEORDER"; + private static final String CREATEVIEW_SQL6 = "abc"; + private static final String ALTERVIEW_SQL = + "alter view `ssb`.`ke_order_view` as select lo_orderkey from SSB.P_LINEORDER"; + private static final String DROPVIEW_SQL1 = "drop view `ssb`.`ke_order_view`"; + private static final String DROPVIEW_SQL2 = "drop table `ssb`.`ke_table1`"; + private static final String DROPVIEW_SQL3 = "drop table `ssb`.`ke_order_view`"; + private static final String DROPVIEW_SQL4 = "drop table `ke_table2`"; + private static final String SHOWVIEW_SQL = "show create table ssb.ke_order_view"; + + @AfterClass + public static void tearDownResource() { + staticCleanupTestMetadata(); + } + + @Before + public void setup() { + createTestMetadata(); + Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN); + SecurityContextHolder.getContext().setAuthentication(authentication); + ReflectionTestUtils.setField(ddlService, "userGroupService", userGroupService); + } + + @After + public void cleanup() { + cleanupTestMetadata(); + } + + @Test + public void testDDL() { + try { + assertKylinExeption( + () -> + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL5), + "DDL function has not been turned on."); + + getTestConfig().setProperty("kylin.source.ddl.enabled", "true"); + NTableMetadataManager tableManager = NTableMetadataManager.getInstance(getTestConfig(), "ssb"); + SparkDDLTestUtils.prepare(); + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL5); + assertKylinExeption( + () -> + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL2), + MsgPicker.getMsg().getDDLViewNameError()); + assertKylinExeption(() -> + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL3), ""); + assertKylinExeption(() -> + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL6), ""); + assertKylinExeption( + () -> + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL4), + MsgPicker.getMsg().getDDLTableNotLoad("SSB.unload_table")); + assertKylinExeption( + () -> + ddlService.executeDDLSql("ssb", DROPVIEW_SQL2), + MsgPicker.getMsg().getDDLDropError()); + assertKylinExeption( + () -> + ddlService.executeDDLSql("ssb", DROPVIEW_SQL3), ""); + + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL1); + ddlService.executeDDLSql("ssb", ALTERVIEW_SQL); + String createViewSQL = ddlService.executeDDLSql("ssb", SHOWVIEW_SQL); + Assert.assertTrue(createViewSQL.contains("ke_order_view")); + ddlService.executeDDLSql("ssb", DROPVIEW_SQL1); + + Authentication authentication = new TestingAuthenticationToken("USER1", + "", Constant.GROUP_ALL_USERS); + SecurityContextHolder.getContext().setAuthentication(authentication); + assertKylinExeption( + () -> + ddlService.executeDDLSql("ssb", CREATEVIEW_SQL1), + MsgPicker.getMsg().getDDLPermissionDenied()); + + // ddl description + List<List<String>> description = ddlService.pluginsDescription("ssb"); + Assert.assertTrue(description.size() > 0); + } finally { + SparkSession spark = SparderEnv.getSparkSession(); + if (spark != null && !spark.sparkContext().isStopped()) { + spark.stop(); + } + } + } +} diff --git a/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala b/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala new file mode 100644 index 0000000000..22af162c2a --- /dev/null +++ b/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.common + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.kylin.common.util.RandomUtil + +import org.apache.spark.sql.{SparderEnv, SparkSession} +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.util.Utils + +object SparkDDLTestUtils { + + def prepare(): Unit = { + val conf = new SparkConf(false) + val warehouse = s"./spark-warehouse/${RandomUtil.randomUUIDStr()}"; + val file = new File(warehouse) + if (file.exists()) { + FileUtils.deleteDirectory(file) + } + conf.set(StaticSQLConf.WAREHOUSE_PATH.key, warehouse) + // Copied from TestHive + // HDFS root scratch dir requires the write all (733) permission. For each connecting user, + // an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with + // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to + // delete it. Later, it will be re-created with the right permission. + val scratchDir = Utils.createTempDir() + if (scratchDir.exists()) { + FileUtils.deleteDirectory(scratchDir) + } + conf.set(ConfVars.SCRATCHDIR.varname, scratchDir.toString) + conf.set("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:derby:memory:db;create=true") + val sparkSession = SparkSession.builder + .master("local[2]") + .appName(getClass.getSimpleName) + .config("fs.file.impl", classOf[DebugFilesystem].getCanonicalName) + .config(conf) + .enableHiveSupport() + .getOrCreate + SparderEnv.setSparkSession(sparkSession) + prepareTable + } + + def prepareTable(): Unit = { + val spark = SparderEnv.getSparkSession + spark.sql("CREATE DATABASE if not exists SSB") + spark.sql("drop table if exists `ssb`.`lineorder`") + spark.sql( + s""" + |CREATE TABLE if not exists `ssb`.`lineorder`( + | `lo_orderkey` bigint, + | `lo_linenumber` bigint, + | `lo_custkey` int, + | `lo_partkey` int, + | `lo_suppkey` int, + | `lo_orderdate` int, + | `lo_orderpriotity` string, + | `lo_shippriotity` int, + | `lo_quantity` bigint, + | `lo_extendedprice` bigint, + | `lo_ordtotalprice` bigint, + | `lo_discount` bigint, + | `lo_revenue` bigint, + | `lo_supplycost` bigint, + | `lo_tax` bigint, + | `lo_commitdate` int, + | `lo_shipmode` string)""".stripMargin) + spark.sql("drop view if exists `ssb`.p_lineorder") + spark.sql( + s""" + |CREATE VIEW if not exists `ssb`.`p_lineorder` AS SELECT + | `lineorder`.`lo_orderkey`, + | `lineorder`.`lo_linenumber`, + | `lineorder`.`lo_custkey`, + | `lineorder`.`lo_partkey`, + | `lineorder`.`lo_suppkey`, + | `lineorder`.`lo_orderdate`, + | `lineorder`.`lo_orderpriotity`, + | `lineorder`.`lo_shippriotity`, + | `lineorder`.`lo_quantity`, + | `lineorder`.`lo_extendedprice`, + | `lineorder`.`lo_ordtotalprice`, + | `lineorder`.`lo_discount`, + | `lineorder`.`lo_revenue`, + | `lineorder`.`lo_supplycost`, + | `lineorder`.`lo_tax`, + | `lineorder`.`lo_commitdate`, + | `lineorder`.`lo_shipmode`, + | `lineorder`.`lo_extendedprice`*`lineorder`.`lo_discount` AS `V_REVENUE` + |FROM `ssb`.`LINEORDER` + """.stripMargin) + spark.sql("drop table if exists `ssb`.`customer`") + spark.sql( + s""" + |CREATE TABLE if not exists `ssb`.`customer`( + | `c_custkey` int, + | `c_name` string, + | `c_address` string, + | `c_city` string, + | `c_nation` string, + | `c_region` string, + | `c_phone` string, + | `c_mktsegment` string) + """.stripMargin) + spark.sql( + s""" + |CREATE TABLE if not exists `ssb`.`unload_table`( + | `c1` int, + | `c2` string) + """.stripMargin) + spark.sql( + s""" + |CREATE TABLE if not exists `ssb`.`ke_table1`( + | `c1` int, + | `c2` string) + """.stripMargin) + } +} diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java new file mode 100644 index 0000000000..38c2e9d003 --- /dev/null +++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.rest.controller; + +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON; + +import java.util.List; + +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.rest.request.ViewDDLRequest; +import org.apache.kylin.rest.response.EnvelopeResponse; +import org.apache.kylin.rest.service.SparkDDLService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; + +@RestController +@RequestMapping(value = "/api/spark_source", produces = {HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON, + HTTP_VND_APACHE_KYLIN_JSON}) +@Slf4j +public class SparkDDLController extends NBasicController { + + @Autowired + private SparkDDLService sparkDDLService; + + @ApiOperation(value = "ddl") + @PostMapping(value = "/ddl") + @ResponseBody + public EnvelopeResponse<String> executeSQL(@RequestBody ViewDDLRequest request) + throws Exception { + checkProjectName(request.getProject()); + String result = sparkDDLService.executeDDLSql(request.getProject(), request.getSql()); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, result, ""); + } + + @ApiOperation(value = "ddl_description") + @GetMapping(value = "/ddl/description") + @ResponseBody + public EnvelopeResponse<List<List<String>>> description(@RequestParam("project") String project) { + checkProjectName(project); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, + sparkDDLService.pluginsDescription(project), ""); + } +} diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java new file mode 100644 index 0000000000..f7d9938989 --- /dev/null +++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.controller; + +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON; + +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.request.ViewDDLRequest; +import org.apache.kylin.rest.service.SparkDDLService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.http.MediaType; +import org.springframework.security.authentication.TestingAuthenticationToken; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import org.springframework.test.web.servlet.result.MockMvcResultMatchers; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; + +public class SparkDDLControllerTest extends NLocalFileMetadataTestCase { + private MockMvc mockMvc; + + @Mock + private SparkDDLService sparkDDLService; + + @InjectMocks + private SparkDDLController ddlController = Mockito.spy(new SparkDDLController()); + + private final Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + mockMvc = MockMvcBuilders.standaloneSetup(ddlController) + .defaultRequest(MockMvcRequestBuilders.get("/")).build(); + SecurityContextHolder.getContext().setAuthentication(authentication); + overwriteSystemProp("HADOOP_USER_NAME", "root"); + createTestMetadata(); + } + + @After + public void tearDown() { + cleanupTestMetadata(); + } + + @Test + public void testExecuteSQL() throws Exception { + ViewDDLRequest request = new ViewDDLRequest(); + request.setProject("ssb"); + + mockMvc.perform(MockMvcRequestBuilders.post("/api/spark_source/ddl") + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(request)) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()).andReturn(); + } + + @Test + public void testDescription() throws Exception { + mockMvc.perform(MockMvcRequestBuilders.get("/api/spark_source/ddl/description?project=ssb") + .contentType(MediaType.APPLICATION_JSON) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()).andReturn(); + } +} \ No newline at end of file diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java index fdc55553ad..b99b9727f7 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java @@ -28,8 +28,10 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.security.UserGroupInformation; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; @@ -165,6 +167,41 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD return isAccess; } + public boolean checkDatabaseHadoopAccessFast(String database) throws Exception { + boolean isAccess = true; + val spark = SparderEnv.getSparkSession(); + try { + String databaseLocation = spark.catalog().getDatabase(database).locationUri(); + RemoteIterator<FileStatus> tablesIterator = getFilesIterator(databaseLocation); + if (tablesIterator.hasNext()) { + Path tablePath = tablesIterator.next().getPath(); + getFilesIterator(tablePath.toString()); + } + } catch (Exception e) { + isAccess = false; + try { + logger.error("Read hive database {} error:{}, ugi name: {}.", database, e.getMessage(), + UserGroupInformation.getCurrentUser().getUserName()); + } catch (IOException ex) { + logger.error("fetch user curr ugi info error.", e); + } + } + return isAccess; + } + + private RemoteIterator<FileStatus> getFilesIterator(String location) throws IOException { + String hiveSpecFsLocation = SparderEnv.getSparkSession().sessionState().conf() + .getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION()); + FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem() + : HadoopUtil.getFileSystem(hiveSpecFsLocation); + if (location.startsWith(fs.getScheme()) || location.startsWith("/")) { + fs.listStatus(new Path(location)); + return fs.listStatusIterator(new Path(location)); + } else { + return HadoopUtil.getFileSystem(location).listStatusIterator(new Path(location)); + } + } + @Override public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj) throws Exception { diff --git a/src/spark-project/spark-ddl-plugin/pom.xml b/src/spark-project/spark-ddl-plugin/pom.xml new file mode 100644 index 0000000000..9df7219174 --- /dev/null +++ b/src/spark-project/spark-ddl-plugin/pom.xml @@ -0,0 +1,58 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>kylin</artifactId> + <groupId>org.apache.kylin</groupId> + <version>5.0.0-SNAPSHOT</version> + <relativePath>../../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kylin-spark-ddl</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-systools</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-metadata</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-sparder</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.12</artifactId> + </dependency> + <dependency> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-spark</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java new file mode 100644 index 0000000000..10e3e3439c --- /dev/null +++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.spark.ddl; + +import static org.apache.kylin.common.exception.ServerErrorCode.DDL_CHECK_ERROR; + +import org.apache.kylin.common.exception.KylinException; + +public interface DDLCheck { + + default String[] description(String project) { + return new String[] {"", ""}; + } + + void check(DDLCheckContext context); + + default void throwException(String msg) { + throw new KylinException(DDL_CHECK_ERROR, msg); + } +} diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java new file mode 100644 index 0000000000..e5e6d1819f --- /dev/null +++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.spark.ddl; + +import java.util.Set; + +public class DDLCheckContext { + private String sql; + private String project; + private String userName; + private Set<String> groups; + + public DDLCheckContext(String sql, String project, String userName, Set<String> groups) { + this.sql = sql; + this.project = project; + this.userName = userName; + this.groups = groups; + } + + public String getSql() { + return sql; + } + + public String getProject() { + return project; + } + + public String getUserName() { + return userName; + } + + public Set<String> getGroups() { + return groups; + } +} diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java new file mode 100644 index 0000000000..c3c54623e5 --- /dev/null +++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.spark.ddl; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.metadata.model.ISourceAware; +import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.rest.util.AclPermissionUtil; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; + +import lombok.val; +import scala.collection.Seq; + +public class SourceTableCheck implements DDLCheck { + + @Override + public String[] description(String project) { + return new String[] { + "The source table used to define the view needs to be loaded into the data source already", + "定义 view 用到的来源表需要已经加载到数据源" + }; + } + + @Override + public void check(DDLCheckContext context) { + val spark = SparderEnv.getSparkSession(); + LogicalPlan logicalPlan = null; + try { + logicalPlan = spark.sessionState().sqlParser().parsePlan(context.getSql()); + } catch (Throwable t) { + throwException(t.getMessage()); + } + val tableManager = NTableMetadataManager.getInstance( + KylinConfig.getInstanceFromEnv(), + context.getProject()); + if (!AclPermissionUtil.hasProjectAdminPermission(context.getProject(), context.getGroups())) { + throwException(MsgPicker.getMsg().getDDLPermissionDenied()); + } + Seq<LogicalPlan> relationLeaves = logicalPlan.collectLeaves(); + if (relationLeaves == null) { + return; + } + for (LogicalPlan plan : scala.collection.JavaConverters.seqAsJavaListConverter(relationLeaves).asJava()) { + if (plan instanceof UnresolvedRelation) { + val tableName = ((UnresolvedRelation) plan).tableName(); + TableDesc tableDesc = tableManager.getTableDesc(tableName); + if (tableDesc == null) { + throwException(MsgPicker.getMsg().getDDLTableNotLoad(tableName)); + } + if (ISourceAware.ID_HIVE != tableDesc.getSourceType() + && ISourceAware.ID_SPARK != tableDesc.getSourceType()) { + throwException(MsgPicker.getMsg().getDDLTableNotSupport(tableName)); + } + } + } + } +} diff --git a/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck b/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck new file mode 100644 index 0000000000..4f3a7527a8 --- /dev/null +++ b/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck @@ -0,0 +1,2 @@ +org.apache.kylin.spark.ddl.SourceTableCheck +org.apache.kylin.spark.ddl.ViewCheck \ No newline at end of file diff --git a/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala b/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala new file mode 100644 index 0000000000..4fc451bdf2 --- /dev/null +++ b/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.spark.ddl + +import java.security.PrivilegedExceptionAction + +import scala.collection.mutable.ListBuffer + +import org.apache.kylin.common.msg.MsgPicker +import org.apache.kylin.engine.spark.source.NSparkMetadataExplorer +import org.apache.kylin.rest.security.KerberosLoginManager +import org.slf4j.LoggerFactory + +import org.apache.spark.sql.SparderEnv +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.{CommandExecutionMode, CommandResultExec, SparkPlan} +import org.apache.spark.sql.execution.command._ + +class ViewCheck extends DDLCheck { + private val log = LoggerFactory.getLogger(classOf[ViewCheck]) + private val PREFIX = "KE_" + private val source = new NSparkMetadataExplorer + + override def description(project: String): Array[String] = { + val databasesHasAccess = listAllDatabasesHasAccess(project) + Array( + "View name should start with `KE_`\n" + + "Only support `create view`,`alter view`,`drop view`,`show create table` syntax\n" + + s"Only supports creating views in ${databasesHasAccess}", + "View 名称需要以`KE_`开头\n" + + "仅支持 `create view`, `drop view`, `alter view`, `show create table` 语法\n" + + s"仅支持在 ${databasesHasAccess} 上述 database 中创建 view") + } + + override def check(context: DDLCheckContext): Unit = { + log.info("start checking DDL view name") + val sql = context.getSql + val project = context.getProject + val spark = SparderEnv.getSparkSession + var plan: SparkPlan = null + try { + val logicalPlan = spark.sessionState.sqlParser.parsePlan(sql) + plan = stripRootCommandResult(spark.sessionState.executePlan( + logicalPlan, CommandExecutionMode.SKIP).executedPlan) + } catch { + case e: Exception => throwException(e.getMessage) + } + plan match { + case ExecutedCommandExec(view: CreateViewCommand) => + checkTableName(view.name) + checkAccess(view.name, project) + case ExecutedCommandExec(view: ShowCreateTableCommand) => + checkTableName(view.table) + checkAccess(view.table, project) + case ExecutedCommandExec(table: DropTableCommand) => + checkTableName(table.tableName) + checkAccess(table.tableName, project) + if (!table.isView) { + throwException(MsgPicker.getMsg.getDDLDropError) + } + case ExecutedCommandExec(table: AlterViewAsCommand) => + checkTableName(table.name) + checkAccess(table.name, project) + case _ => throwException(MsgPicker.getMsg.getDDLUnSupported) + } + } + + private def checkTableName(identifier: TableIdentifier): Unit = { + if (!identifier.table.toUpperCase().startsWith(PREFIX)) { + throwException(MsgPicker.getMsg.getDDLViewNameError) + } + } + + def checkAccess(identifier: TableIdentifier, project: String): Unit = { + val database = identifier.database.get + val ugi = KerberosLoginManager.getInstance.getProjectUGI(project) + val hasDatabaseAccess = ugi.doAs(new PrivilegedExceptionAction[Boolean]() { + override def run(): Boolean = { + source.checkDatabaseHadoopAccessFast(database) + } + }) + if (!hasDatabaseAccess) { + throwException(MsgPicker.getMsg.getDDLDatabaseAccessnDenied) + } + } + + def listAllDatabasesHasAccess(project: String): String = { + val ugi = KerberosLoginManager.getInstance.getProjectUGI(project) + val databasesHasAccess = ugi.doAs(new PrivilegedExceptionAction[List[String]]() { + override def run(): List[String] = { + val databases = source.listDatabases() + val databasesHasAccess = ListBuffer[String]() + databases.forEach(db => { + if (source.checkDatabaseHadoopAccessFast(db)) { + databasesHasAccess.append(db) + } + }) + databasesHasAccess.toList + } + }) + databasesHasAccess.mkString(",") + } + + private def stripRootCommandResult(executedPlan: SparkPlan) = executedPlan match { + case CommandResultExec(_, plan, _) => plan + case other => other + } +}