This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da5608  [SPARK-37283][SQL] Don't try to store a V1 table which 
contains ANSI intervals in Hive compatible format
3da5608 is described below

commit 3da5608f26b8f743d595d618d168231d426fabf2
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Mon Nov 15 18:00:17 2021 +0800

    [SPARK-37283][SQL] Don't try to store a V1 table which contains ANSI 
intervals in Hive compatible format
    
    ### What changes were proposed in this pull request?
    
    If, a table being created contains a column of ANSI interval types and the 
underlying file format has a corresponding Hive SerDe (e.g. Parquet),
    `HiveExternalcatalog` tries to store the table in Hive compatible format.
    But, as ANSI interval types in Spark and interval type in Hive are not 
compatible (Hive only supports interval_year_month and interval_day_time), the 
following warning with stack trace will be logged.
    
    ```
    spark-sql> CREATE TABLE tbl1(a INTERVAL YEAR TO MONTH) USING Parquet;
    21/11/11 14:39:29 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, 
since hive.security.authorization.manager is set to instance of 
HiveAuthorizerFactory.
    21/11/11 14:39:29 WARN HiveExternalCatalog: Could not persist 
`default`.`tbl1` in a Hive compatible way. Persisting it into Hive metastore in 
Spark SQL specific format.
    org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.IllegalArgumentException: Error: type expected at the position 0 of 
'interval year to month' but 'interval year to month' is found.
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:869)
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:874)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$createTable$1(HiveClientImpl.scala:553)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.createTable(HiveClientImpl.scala:551)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.saveTableIntoHive(HiveExternalCatalog.scala:499)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.createDataSourceTable(HiveExternalCatalog.scala:397)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:274)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:245)
        at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:376)
        at 
org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:120)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:287)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.IllegalArgumentException: Error: type expected at the 
position 0 of 'interval year to month' but 'interval year to month' is found.
        at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:372)
        at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:355)
        at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:416)
        at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:329)
        at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:814)
        at 
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe.initialize(ParquetHiveSerDe.java:110)
        at 
org.apache.hadoop.hive.serde2.AbstractSerDe.initialize(AbstractSerDe.java:54)
        at 
org.apache.hadoop.hive.serde2.SerDeUtils.initializeSerDe(SerDeUtils.java:533)
        at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:453)
        at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:440)
        at 
org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:281)
        at 
org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:199)
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:842)
        ... 73 more
    21/11/11 14:39:29 WARN HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
    21/11/11 14:39:29 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout 
does not exist
    21/11/11 14:39:29 WARN HiveConf: HiveConf of name hive.stats.retries.wait 
does not exist
    ```
    
    In such case, `HiveExternalCatalog` fallbacks to store the table in Spark 
specific format  but the stack trace is surprising and confusable.
    So the solution this PR proposes is not to try to create such tables in 
Hive compatible format because it should always fail, and logs a message which 
says the table is stored in Spark specific format.
    ### Why are the changes needed?
    
    To fix the confusable behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New test.
    
    Closes #34551 from sarutak/fix-ansi-interval-hive-type.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/hive/HiveExternalCatalog.scala       | 19 +++++-
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 68 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 2 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 568e814..179b424 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -46,8 +46,7 @@ import 
org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOpti
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
-import org.apache.spark.sql.types.{DataType, StructType}
-
+import org.apache.spark.sql.types.{AnsiIntervalType, ArrayType, DataType, 
MapType, StructType}
 
 /**
  * A persistent implementation of the system catalog using Hive.
@@ -357,6 +356,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 
     val qualifiedTableName = table.identifier.quotedString
     val maybeSerde = HiveSerDe.sourceToSerDe(provider)
+    val incompatibleTypes =
+      table.schema.filter(f => 
!isHiveCompatibleDataType(f.dataType)).map(_.dataType.simpleString)
 
     val (hiveCompatibleTable, logMessage) = maybeSerde match {
       case _ if options.skipHiveMetadata =>
@@ -365,6 +366,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
             "Spark SQL specific format, which is NOT compatible with Hive."
         (None, message)
 
+      case _ if incompatibleTypes.nonEmpty =>
+        val message =
+          s"Hive incompatible types found: ${incompatibleTypes.mkString(", 
")}. " +
+            s"Persisting data source table $qualifiedTableName into Hive 
metastore in " +
+            "Spark SQL specific format, which is NOT compatible with Hive."
+        (None, message)
       // our bucketing is un-compatible with hive(different hash function)
       case Some(serde) if table.bucketSpec.nonEmpty =>
         val message =
@@ -1402,4 +1409,12 @@ object HiveExternalCatalog {
     provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)
   }
 
+  private[spark] def isHiveCompatibleDataType(dt: DataType): Boolean = dt 
match {
+    case _: AnsiIntervalType => false
+    case s: StructType => s.forall(f => isHiveCompatibleDataType(f.dataType))
+    case a: ArrayType => isHiveCompatibleDataType(a.elementType)
+    case m: MapType =>
+      isHiveCompatibleDataType(m.keyType) && 
isHiveCompatibleDataType(m.valueType)
+    case _ => true
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index ba44192..644d304 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
+import org.apache.log4j.Level
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -1403,6 +1404,73 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     }
   }
 
+  test("SPARK-37283: Don't try to store a V1 table in Hive compatible format " 
+
+    "if the table contains Hive incompatible types") {
+    import DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND}
+    import YearMonthIntervalType.{MONTH, YEAR}
+    withTable("t") {
+      val logAppender = new LogAppender(
+        s"Check whether a message is shown and it says that the V1 table 
contains " +
+          "Hive incompatible types if the table contains Hive incompatible 
types")
+      logAppender.setThreshold(Level.WARN)
+      withLogAppender(logAppender) {
+        sql(
+          """
+            |CREATE TABLE t(
+            |  c1 INTERVAL DAY TO MINUTE,
+            |  c2 STRING,
+            |  c3 INTERVAL YEAR TO MONTH,
+            |  c4 INT,
+            |  c5 INTERVAL HOUR,
+            |  c6 INTERVAL MONTH,
+            |  c7 STRUCT<a: INT, b: STRING>,
+            |  c8 STRUCT<a: INT, b: INTERVAL HOUR TO SECOND>,
+            |  c9 ARRAY<INT>,
+            |  c10 ARRAY<INTERVAL YEAR>,
+            |  c11 MAP<INT, STRING>,
+            |  c12 MAP<INT, INTERVAL DAY>,
+            |  c13 MAP<INTERVAL MINUTE TO SECOND, STRING>
+            |) USING Parquet""".stripMargin)
+      }
+      val expectedMsg = "Hive incompatible types found: interval day to 
minute, " +
+        "interval year to month, interval hour, interval month, " +
+        "struct<a:int,b:interval hour to second>, " +
+        "array<interval year>, map<int,interval day>, " +
+        "map<interval minute to second,string>. " +
+        "Persisting data source table `default`.`t` into Hive metastore in " +
+        "Spark SQL specific format, which is NOT compatible with Hive."
+      val actualMessages = logAppender.loggingEvents
+        .map(_.getRenderedMessage)
+        .filter(_.contains("incompatible"))
+      assert(actualMessages.contains(expectedMsg))
+      assert(hiveClient.getTable("default", "t").schema
+        .forall(_.dataType == ArrayType(StringType)))
+
+      val df = sql("SELECT * FROM t")
+      assert(df.schema ===
+        StructType(Seq(
+          StructField("c1", DayTimeIntervalType(DAY, MINUTE)),
+          StructField("c2", StringType),
+          StructField("c3", YearMonthIntervalType(YEAR, MONTH)),
+          StructField("c4", IntegerType),
+          StructField("c5", DayTimeIntervalType(HOUR)),
+          StructField("c6", YearMonthIntervalType(MONTH)),
+          StructField("c7",
+            StructType(Seq(
+              StructField("a", IntegerType),
+              StructField("b", StringType)))),
+          StructField("c8",
+            StructType(Seq(
+              StructField("a", IntegerType),
+              StructField("b", DayTimeIntervalType(HOUR, SECOND))))),
+          StructField("c9", ArrayType(IntegerType)),
+          StructField("c10", ArrayType(YearMonthIntervalType(YEAR))),
+          StructField("c11", MapType(IntegerType, StringType)),
+          StructField("c12", MapType(IntegerType, DayTimeIntervalType(DAY))),
+          StructField("c13", MapType(DayTimeIntervalType(MINUTE, SECOND), 
StringType)))))
+    }
+  }
+
   private def withDebugMode(f: => Unit): Unit = {
     val previousValue = sparkSession.sparkContext.conf.get(DEBUG_MODE)
     try {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to