This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a2e63dd714 [SPARK-47850][SQL] Support 
`spark.sql.hive.convertInsertingUnpartitionedTable`
52a2e63dd714 is described below

commit 52a2e63dd7147e2701c9c26667fe5bd9fdc3f14c
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Thu Apr 18 15:05:15 2024 +0800

    [SPARK-47850][SQL] Support 
`spark.sql.hive.convertInsertingUnpartitionedTable`
    
    ### What changes were proposed in this pull request?
    
    This PR introduced a new configuration 
`spark.sql.hive.convertInsertingUnpartitionedTable` alongside the existing 
`spark.sql.hive.convertInsertingPartitionedTable` to allow fine grain switching 
from Hive Serde to Data Source on inserting Parquet/ORC Hive tables.
    
    ### Why are the changes needed?
    
    In the practice of hybrid workload (Hive tables may be read/written by 
Hive, Spark, Impala, etc.), we usually use DataSource for reading Parquet/ORC 
tables but Hive Serde for writing, the current configuration combination allows 
us to achieve that except for unpartitioned tables.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The new added configuration 
`spark.sql.hive.convertInsertingUnpartitionedTable` default value is `true`, 
which keeps the existing behavior.
    
    ### How was this patch tested?
    
    New UT is added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46052 from pan3793/SPARK-47850.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../plans/logical/basicLogicalOperators.scala      |  1 +
 .../apache/spark/sql/execution/command/views.scala |  1 +
 .../org/apache/spark/sql/hive/HiveStrategies.scala | 10 ++++--
 .../org/apache/spark/sql/hive/HiveUtils.scala      | 10 ++++++
 .../execution/CreateHiveTableAsSelectCommand.scala |  5 ++-
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  7 ++++
 .../spark/sql/hive/orc/HiveOrcQuerySuite.scala     | 37 ++++++++++++++++++++++
 7 files changed, 67 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 1c8f7a97dd7f..7c36e3bc79af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -783,6 +783,7 @@ object View {
         "spark.sql.hive.convertMetastoreParquet",
         "spark.sql.hive.convertMetastoreOrc",
         "spark.sql.hive.convertInsertingPartitionedTable",
+        "spark.sql.hive.convertInsertingUnpartitionedTable",
         "spark.sql.hive.convertMetastoreCtas"
       ).contains(key) || key.startsWith("spark.sql.catalog.")
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index d71d0d43683c..cb5e7e7f42d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -360,6 +360,7 @@ object ViewHelper extends SQLConfHelper with Logging {
     "spark.sql.hive.convertMetastoreParquet",
     "spark.sql.hive.convertMetastoreOrc",
     "spark.sql.hive.convertInsertingPartitionedTable",
+    "spark.sql.hive.convertInsertingUnpartitionedTable",
     "spark.sql.hive.convertMetastoreCtas",
     SQLConf.ADDITIONAL_REMOTE_REPOSITORIES.key)
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5972a9df78ec..e74cc088a1f6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, Ins
 import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, 
LogicalRelation}
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
+import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS
 import org.apache.spark.sql.internal.HiveSerDe
 
 
@@ -194,6 +195,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
  * - When writing to non-partitioned Hive-serde Parquet/Orc tables
  * - When writing to partitioned Hive-serde Parquet/Orc tables when
  *   `spark.sql.hive.convertInsertingPartitionedTable` is true
+ * - When writing to unpartitioned Hive-serde Parquet/Orc tables when
+ *   `spark.sql.hive.convertInsertingUnpartitionedTable` is true
  * - When writing to directory with Hive-serde
  * - When writing to non-partitioned Hive-serde Parquet/ORC tables using CTAS
  * - When scanning Hive-serde Parquet/ORC tables
@@ -230,7 +233,8 @@ case class RelationConversions(
       case InsertIntoStatement(
           r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists, byName)
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
-            (!r.isPartitioned || 
conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
+            ((r.isPartitioned && 
conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) ||
+              (!r.isPartitioned && 
conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE)))
             && isConvertible(r) =>
         InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), 
partition, cols,
           query, overwrite, ifPartitionNotExists, byName)
@@ -245,11 +249,11 @@ case class RelationConversions(
       // that only matches table insertion inside Hive CTAS.
       // This pattern would not cause conflicts because this rule is always 
applied before
       // `HiveAnalysis` and both of these rules are running once.
-      case InsertIntoHiveTable(
+      case i @ InsertIntoHiveTable(
         tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _)
           if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
             tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) 
&&
-            conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
+            conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) && 
i.getTagValue(BY_CTAS).isDefined =>
         // validation is required to be done here before relation conversion.
         DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
         val hiveTable = DDLUtils.readHiveTable(tableDesc)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 3fc761785acc..5f59e9ca95f9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -154,6 +154,16 @@ private[spark] object HiveUtils extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CONVERT_INSERTING_UNPARTITIONED_TABLE =
+    buildConf("spark.sql.hive.convertInsertingUnpartitionedTable")
+      .doc("When set to true, and `spark.sql.hive.convertMetastoreParquet` or 
" +
+        "`spark.sql.hive.convertMetastoreOrc` is true, the built-in 
ORC/Parquet writer is used" +
+        "to process inserting into unpartitioned ORC/Parquet tables created by 
using the HiveSQL " +
+        "syntax.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas")
     .doc("When set to true,  Spark will try to use built-in data source writer 
" +
       "instead of Hive serde in CTAS. This flag is effective only if " +
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 811d186b17d2..154d07f80d89 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDe
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.{DataWritingCommand, 
LeafRunnableCommand}
+import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS
 
 /**
  * Create table and insert the query result into it.
@@ -98,13 +99,15 @@ case class CreateHiveTableAsSelectCommand(
       tableExists: Boolean): DataWritingCommand = {
     // For CTAS, there is no static partition values to insert.
     val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
-    InsertIntoHiveTable(
+    val insertHive = InsertIntoHiveTable(
       tableDesc,
       partition,
       query,
       overwrite = false,
       ifPartitionNotExists = false,
       outputColumnNames = outputColumnNames)
+    insertHive.setTagValue(BY_CTAS, ())
+    insertHive
   }
 
   override def argString(maxFields: Int): String = {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 4a92bfd84040..cf296e8be4f1 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
@@ -235,6 +236,12 @@ case class InsertIntoHiveTable(
 }
 
 object InsertIntoHiveTable extends V1WritesHiveUtils {
+
+  /**
+   * A tag to identify if this command is created by a CTAS.
+   */
+  val BY_CTAS = TreeNodeTag[Unit]("by_ctas")
+
   def apply(
       table: CatalogTable,
       partition: Map[String, Option[String]],
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
index e52d9b639dc4..610fc246cd84 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
@@ -284,6 +284,43 @@ class HiveOrcQuerySuite extends OrcQueryTest with 
TestHiveSingleton {
     }
   }
 
+  test("SPARK-47850 ORC conversation could be applied for unpartitioned table 
insertion") {
+    withTempView("single") {
+      val singleRowDF = Seq((0, "foo")).toDF("key", "value")
+      singleRowDF.createOrReplaceTempView("single")
+      Seq("true", "false").foreach { conversion =>
+        withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true",
+          HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE.key -> conversion) {
+          withTable("dummy_orc_unpartitioned") {
+            spark.sql(
+              s"""
+                 |CREATE TABLE dummy_orc_unpartitioned(key INT, value STRING)
+                 |STORED AS ORC
+                 """.stripMargin)
+
+            spark.sql(
+              s"""
+                 |INSERT INTO TABLE dummy_orc_unpartitioned
+                 |SELECT key, value FROM single
+                 """.stripMargin)
+
+            val orcUnpartitionedTable = 
TableIdentifier("dummy_orc_unpartitioned", Some("default"))
+            if (conversion == "true") {
+              // if converted, we refresh the cached relation.
+              assert(getCachedDataSourceTable(orcUnpartitionedTable) === null)
+            } else {
+              // otherwise, not cached.
+              assert(getCachedDataSourceTable(orcUnpartitionedTable) === null)
+            }
+
+            val df = spark.sql("SELECT key, value FROM dummy_orc_unpartitioned 
WHERE key=0")
+            checkAnswer(df, singleRowDF)
+          }
+        }
+      }
+    }
+  }
+
   test("SPARK-32234 read ORC table with column names all starting with 
'_col'") {
     Seq("native", "hive").foreach { orcImpl =>
       Seq("false", "true").foreach { vectorized =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to