This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4be76ac572027e8f481ffcc82a70b520639f878c Author: Jiale He <jialeheb...@gmail.com> AuthorDate: Wed Dec 14 20:16:41 2022 +0800 KYLIN-5426 fix spark collect ddl --- .../kylin/rest/service/SparkSourceServiceTest.java | 28 ++++++++++-- .../scala/org/apache/spark/sql/DdlOperation.scala | 53 +++++++++++++--------- 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java index 54be7892b4..ba52319978 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.test.TestingServer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; @@ -36,9 +37,11 @@ import org.apache.kylin.rest.request.DDLRequest; import org.apache.kylin.rest.response.DDLResponse; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.DDLDesc; +import org.apache.spark.sql.DdlOperation; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -185,13 +188,13 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { @Test public void testDatabaseExists() { - Assert.assertEquals(true, sparkSourceService.databaseExists("default")); + Assert.assertTrue(sparkSourceService.databaseExists("default")); } @Test public void testDropTable() throws AnalysisException { sparkSourceService.dropTable("default", "COUNTRY"); - Assert.assertEquals(false, sparkSourceService.tableExists("default", "COUNTRY")); + Assert.assertFalse(sparkSourceService.tableExists("default", "COUNTRY")); } @Test @@ -202,6 +205,7 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { @Test public void testExportTables() { + // hive data source String expectedTableStructure = "CREATE EXTERNAL TABLE `default`.`hive_bigints`( `id` BIGINT) " + "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' " + "WITH SERDEPROPERTIES ( 'serialization.format' = '1') STORED AS " @@ -210,12 +214,26 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { + "LOCATION 'file:/tmp/parquet_data' "; sparkSourceService.executeSQL( "CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '/tmp/parquet_data'"); - String actureTableStructure = sparkSourceService.exportTables("default", new String[] { "hive_bigints" }) .getTables().get("hive_bigints"); - Assert.assertEquals(actureTableStructure.substring(0, actureTableStructure.lastIndexOf("TBLPROPERTIES")), expectedTableStructure); + Assert.assertTrue(DdlOperation.isHiveTable("default", "hive_bigints")); + + // spark datasource + sparkSourceService.executeSQL( + "CREATE EXTERNAL TABLE spark_bigints(id bigint) USING PARQUET LOCATION '/tmp/parquet_data_spark'"); + Assert.assertFalse(DdlOperation.isHiveTable("default", "spark_bigints")); + String sparkDDL = sparkSourceService.exportTables("default", new String[] { "spark_bigints" }).getTables() + .get("spark_bigints"); + Assert.assertFalse(sparkDDL.isEmpty()); + Assert.assertTrue(StringUtils.containsIgnoreCase(sparkDDL, "USING PARQUET")); + + // view + sparkSourceService.executeSQL("CREATE VIEW view_bigints as select id from default.spark_bigints"); + String viewDDL = DdlOperation.collectDDL(TableIdentifier.apply("view_bigints"), + "show create view default.view_bigints"); + Assert.assertFalse(StringUtils.isEmpty(viewDDL)); } @Test @@ -269,7 +287,7 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { } @Test - public void testTableExists() throws IOException { + public void testTableExists() { Assert.assertTrue(sparkSourceService.tableExists("default", "COUNTRY")); } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala index 13099111cb..f1db6006b1 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala @@ -17,19 +17,18 @@ */ package org.apache.spark.sql +import org.apache.spark.internal.Logging import org.apache.spark.sql.DDLDesc.DDLType import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.{CommandExecutionMode, CommandResultExec, QueryExecution, SparkPlan} -import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, CreateDatabaseCommand, CreateTableCommand, CreateViewCommand, DropDatabaseCommand, DropTableCommand, ExecutedCommandExec, ShowCreateTableAsSerdeCommand, ShowPartitionsCommand} import org.apache.spark.sql.types.StructField + import java.lang.{String => JString} import java.util.{List => JList} - -import org.apache.spark.internal.Logging - import scala.collection.JavaConverters._ @@ -108,31 +107,41 @@ object DdlOperation extends Logging { } def getTableDesc(database: String, table: String): String = { - val sql = s"show create table ${database}.${table} as serde" - var ddl = "" - val logicalPlan: LogicalPlan = SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(sql) + var sql = s"SHOW CREATE TABLE $database.$table" + sql = if (isHiveTable(database, table)) sql + " AS SERDE" else sql + val logicalPlan = SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(sql) val queryExecution: QueryExecution = SparderEnv.getSparkSession.sessionState.executePlan(logicalPlan, CommandExecutionMode.SKIP) stripRootCommandResult(queryExecution.executedPlan) match { - case ExecutedCommandExec(show: ShowCreateTableAsSerdeCommand) => - val catalog: SessionCatalog = SparderEnv.getSparkSession.sessionState.catalog - val metadata: CatalogTable = catalog.getTableMetadata(show.table) - metadata.tableType match { - case CatalogTableType.VIEW => - val builder = new StringBuilder - builder ++= s"CREATE VIEW ${show.table.quotedString}" - if (metadata.schema.nonEmpty) { - builder ++= metadata.schema.map(_.toViewDDL).mkString("(", ", ", ")") - } - builder ++= metadata.viewText.mkString(" AS\n", "", "\n") - ddl = builder.toString() - case CatalogTableType.MANAGED => ddl = "" - case CatalogTableType.EXTERNAL => ddl = SparderEnv.getSparkSession.sql(sql).takeAsList(1).get(0).getString(0) + case ExecutedCommandExec(show: ShowCreateTableCommand) => collectDDL(show.table, sql) + case ExecutedCommandExec(show: ShowCreateTableAsSerdeCommand) => collectDDL(show.table, sql) + } + } + + def isHiveTable(database: String, table: String): Boolean = { + val tableMetadata = SparderEnv.getSparkSession.sessionState.catalog + .getTableRawMetadata(TableIdentifier(table, Some(database))) + !DDLUtils.isDatasourceTable(tableMetadata) + } + + def collectDDL(tableIdentifier: TableIdentifier, sql: String): String = { + val catalog: SessionCatalog = SparderEnv.getSparkSession.sessionState.catalog + val metadata: CatalogTable = catalog.getTableMetadata(tableIdentifier) + metadata.tableType match { + case CatalogTableType.VIEW => + val builder = new StringBuilder + builder ++= s"CREATE VIEW ${tableIdentifier.quotedString}" + if (metadata.schema.nonEmpty) { + builder ++= metadata.schema.map(_.toViewDDL).mkString("(", ", ", ")") } + builder ++= metadata.viewText.mkString(" AS\n", "", "\n") + builder.toString() + case CatalogTableType.MANAGED => "" + case CatalogTableType.EXTERNAL => SparderEnv.getSparkSession.sql(sql).takeAsList(1).get(0).getString(0) } - ddl } + def calculatePartition(database: String, table: String): Seq[Row] = { val logicalPlan: LogicalPlan = SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(s"show partitions ${database}.${table}") val queryExecution: QueryExecution = SparderEnv.getSparkSession.sessionState.executePlan(logicalPlan,