This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4be76ac572027e8f481ffcc82a70b520639f878c
Author: Jiale He <jialeheb...@gmail.com>
AuthorDate: Wed Dec 14 20:16:41 2022 +0800

    KYLIN-5426 fix spark collect ddl
---
 .../kylin/rest/service/SparkSourceServiceTest.java | 28 ++++++++++--
 .../scala/org/apache/spark/sql/DdlOperation.scala  | 53 +++++++++++++---------
 2 files changed, 54 insertions(+), 27 deletions(-)

diff --git 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
index 54be7892b4..ba52319978 100644
--- 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
+++ 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -36,9 +37,11 @@ import org.apache.kylin.rest.request.DDLRequest;
 import org.apache.kylin.rest.response.DDLResponse;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.DDLDesc;
+import org.apache.spark.sql.DdlOperation;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -185,13 +188,13 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
 
     @Test
     public void testDatabaseExists() {
-        Assert.assertEquals(true, 
sparkSourceService.databaseExists("default"));
+        Assert.assertTrue(sparkSourceService.databaseExists("default"));
     }
 
     @Test
     public void testDropTable() throws AnalysisException {
         sparkSourceService.dropTable("default", "COUNTRY");
-        Assert.assertEquals(false, sparkSourceService.tableExists("default", 
"COUNTRY"));
+        Assert.assertFalse(sparkSourceService.tableExists("default", 
"COUNTRY"));
     }
 
     @Test
@@ -202,6 +205,7 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
 
     @Test
     public void testExportTables() {
+        // hive data source
         String expectedTableStructure = "CREATE EXTERNAL TABLE 
`default`.`hive_bigints`(   `id` BIGINT) "
                 + "ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' "
                 + "WITH SERDEPROPERTIES (   'serialization.format' = '1') 
STORED AS   "
@@ -210,12 +214,26 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
                 + "LOCATION 'file:/tmp/parquet_data' ";
         sparkSourceService.executeSQL(
                 "CREATE EXTERNAL TABLE hive_bigints(id bigint)  STORED AS 
PARQUET LOCATION '/tmp/parquet_data'");
-
         String actureTableStructure = 
sparkSourceService.exportTables("default", new String[] { "hive_bigints" })
                 .getTables().get("hive_bigints");
-
         Assert.assertEquals(actureTableStructure.substring(0, 
actureTableStructure.lastIndexOf("TBLPROPERTIES")),
                 expectedTableStructure);
+        Assert.assertTrue(DdlOperation.isHiveTable("default", "hive_bigints"));
+
+        // spark datasource
+        sparkSourceService.executeSQL(
+                "CREATE EXTERNAL TABLE spark_bigints(id bigint) USING PARQUET 
LOCATION '/tmp/parquet_data_spark'");
+        Assert.assertFalse(DdlOperation.isHiveTable("default", 
"spark_bigints"));
+        String sparkDDL = sparkSourceService.exportTables("default", new 
String[] { "spark_bigints" }).getTables()
+                .get("spark_bigints");
+        Assert.assertFalse(sparkDDL.isEmpty());
+        Assert.assertTrue(StringUtils.containsIgnoreCase(sparkDDL, "USING 
PARQUET"));
+
+        // view
+        sparkSourceService.executeSQL("CREATE VIEW view_bigints as select id 
from default.spark_bigints");
+        String viewDDL = 
DdlOperation.collectDDL(TableIdentifier.apply("view_bigints"),
+                "show create view default.view_bigints");
+        Assert.assertFalse(StringUtils.isEmpty(viewDDL));
     }
 
     @Test
@@ -269,7 +287,7 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testTableExists() throws IOException {
+    public void testTableExists() {
         Assert.assertTrue(sparkSourceService.tableExists("default", 
"COUNTRY"));
     }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
index 13099111cb..f1db6006b1 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
@@ -17,19 +17,18 @@
  */
 package org.apache.spark.sql
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DDLDesc.DDLType
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, 
quoteIdentifier}
+import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.{CommandExecutionMode, 
CommandResultExec, QueryExecution, SparkPlan}
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
CreateDatabaseCommand, CreateTableCommand, CreateViewCommand, 
DropDatabaseCommand, DropTableCommand, ExecutedCommandExec, 
ShowCreateTableAsSerdeCommand, ShowPartitionsCommand}
 import org.apache.spark.sql.types.StructField
+
 import java.lang.{String => JString}
 import java.util.{List => JList}
-
-import org.apache.spark.internal.Logging
-
 import scala.collection.JavaConverters._
 
 
@@ -108,31 +107,41 @@ object DdlOperation extends Logging {
   }
 
   def getTableDesc(database: String, table: String): String = {
-    val sql = s"show create table ${database}.${table} as serde"
-    var ddl = ""
-    val logicalPlan: LogicalPlan = 
SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(sql)
+    var sql = s"SHOW CREATE TABLE $database.$table"
+    sql = if (isHiveTable(database, table)) sql + " AS SERDE" else sql
+    val logicalPlan = 
SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(sql)
     val queryExecution: QueryExecution = 
SparderEnv.getSparkSession.sessionState.executePlan(logicalPlan,
       CommandExecutionMode.SKIP)
     stripRootCommandResult(queryExecution.executedPlan) match {
-      case ExecutedCommandExec(show: ShowCreateTableAsSerdeCommand) =>
-        val catalog: SessionCatalog = 
SparderEnv.getSparkSession.sessionState.catalog
-        val metadata: CatalogTable = catalog.getTableMetadata(show.table)
-        metadata.tableType match {
-          case CatalogTableType.VIEW =>
-            val builder = new StringBuilder
-            builder ++= s"CREATE VIEW ${show.table.quotedString}"
-            if (metadata.schema.nonEmpty) {
-              builder ++= metadata.schema.map(_.toViewDDL).mkString("(", ", ", 
")")
-            }
-            builder ++= metadata.viewText.mkString(" AS\n", "", "\n")
-            ddl = builder.toString()
-          case CatalogTableType.MANAGED => ddl = ""
-          case CatalogTableType.EXTERNAL => ddl = 
SparderEnv.getSparkSession.sql(sql).takeAsList(1).get(0).getString(0)
+      case ExecutedCommandExec(show: ShowCreateTableCommand) => 
collectDDL(show.table, sql)
+      case ExecutedCommandExec(show: ShowCreateTableAsSerdeCommand) => 
collectDDL(show.table, sql)
+    }
+  }
+
+  def isHiveTable(database: String, table: String): Boolean = {
+    val tableMetadata = SparderEnv.getSparkSession.sessionState.catalog
+      .getTableRawMetadata(TableIdentifier(table, Some(database)))
+    !DDLUtils.isDatasourceTable(tableMetadata)
+  }
+
+  def collectDDL(tableIdentifier: TableIdentifier, sql: String): String = {
+    val catalog: SessionCatalog = 
SparderEnv.getSparkSession.sessionState.catalog
+    val metadata: CatalogTable = catalog.getTableMetadata(tableIdentifier)
+    metadata.tableType match {
+      case CatalogTableType.VIEW =>
+        val builder = new StringBuilder
+        builder ++= s"CREATE VIEW ${tableIdentifier.quotedString}"
+        if (metadata.schema.nonEmpty) {
+          builder ++= metadata.schema.map(_.toViewDDL).mkString("(", ", ", ")")
         }
+        builder ++= metadata.viewText.mkString(" AS\n", "", "\n")
+        builder.toString()
+      case CatalogTableType.MANAGED => ""
+      case CatalogTableType.EXTERNAL => 
SparderEnv.getSparkSession.sql(sql).takeAsList(1).get(0).getString(0)
     }
-    ddl
   }
 
+
   def calculatePartition(database: String, table: String): Seq[Row] = {
     val logicalPlan: LogicalPlan = 
SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(s"show partitions 
${database}.${table}")
     val queryExecution: QueryExecution = 
SparderEnv.getSparkSession.sessionState.executePlan(logicalPlan,

Reply via email to