This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d0c4d22  [SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for 
reading hive partitioned tables
d0c4d22 is described below

commit d0c4d224e0bf8d13ccc36882169e34603e7a6d39
Author: Kent Yao <y...@apache.org>
AuthorDate: Mon Jul 19 15:59:36 2021 +0800

    [SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for reading hive 
partitioned tables
    
    ### What changes were proposed in this pull request?
    
    A hive partition can have different `PartitionDesc`s from `TableDesc` for 
describing Serde/InputFormatClass/OutputFormatClass, for a hive partitioned 
table, we shall respect those in `PartitionDesc`.
    
    ### Why are the changes needed?
    
    in many cases, that Spark reads hive tables could result in surprise 
because of this issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, hive partition table that contains different serde/input/output could 
be recognized by Spark
    
    ### How was this patch tested?
    
    new test added
    
    Closes #33406 from yaooqinn/SPARK-36197.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit ef803566144ba26b124ddff1d3a3fcdf95678317)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../org/apache/spark/sql/hive/TableReader.scala    | 48 ++++++++++++++++++++--
 .../spark/sql/hive/execution/SQLQuerySuite.scala   | 18 ++++++++
 2 files changed, 62 insertions(+), 4 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 96949a3..a048026 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
 import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table 
=> HiveTable}
-import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.ql.plan.{PartitionDesc, TableDesc}
 import org.apache.hadoop.hive.serde2.Deserializer
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AvroTableProperties
 import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, 
StructObjectInspector}
@@ -201,7 +201,7 @@ class HadoopTableReader(
 
     val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
       .map { case (partition, partDeserializer) =>
-      val partDesc = Utilities.getPartitionDesc(partition)
+      val partDesc = Utilities.getPartitionDescFromTableDesc(tableDesc, 
partition, true)
       val partPath = partition.getDataLocation
       val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
       // Get partition field info
@@ -245,7 +245,8 @@ class HadoopTableReader(
 
       // Create local references so that the outer object isn't serialized.
       val localTableDesc = tableDesc
-      createHadoopRDD(localTableDesc, inputPathStr).mapPartitions { iter =>
+
+      createHadoopRDD(partDesc, inputPathStr).mapPartitions { iter =>
         val hconf = broadcastedHiveConf.value.value
         val deserializer = localDeserializer.getConstructor().newInstance()
         // SPARK-13709: For SerDes like AvroSerDe, some essential information 
(e.g. Avro schema
@@ -314,6 +315,15 @@ class HadoopTableReader(
     }
   }
 
+  private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: 
String): RDD[Writable] = {
+    val inputFormatClazz = partitionDesc.getInputFileFormatClass
+    if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
+      createNewHadoopRDD(partitionDesc, inputPathStr)
+    } else {
+      createOldHadoopRDD(partitionDesc, inputPathStr)
+    }
+  }
+
   /**
    * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
    * applied locally on each executor.
@@ -322,7 +332,24 @@ class HadoopTableReader(
     val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
     val inputFormatClass = tableDesc.getInputFileFormatClass
       .asInstanceOf[Class[oldInputClass[Writable, Writable]]]
+    createOldHadoopRDD(inputFormatClass, initializeJobConfFunc)
+  }
 
+  /**
+   * Creates a HadoopRDD based on the broadcasted HiveConf and other job 
properties that will be
+   * applied locally on each executor.
+   */
+  private def createOldHadoopRDD(partitionDesc: PartitionDesc, path: String): 
RDD[Writable] = {
+    val initializeJobConfFunc =
+      HadoopTableReader.initializeLocalJobConfFunc(path, 
partitionDesc.getTableDesc) _
+    val inputFormatClass = partitionDesc.getInputFileFormatClass
+      .asInstanceOf[Class[oldInputClass[Writable, Writable]]]
+    createOldHadoopRDD(inputFormatClass, initializeJobConfFunc)
+  }
+
+  private def createOldHadoopRDD(
+      inputFormatClass: Class[oldInputClass[Writable, Writable]],
+      initializeJobConfFunc: JobConf => Unit): RDD[Writable] = {
     val rdd = new HadoopRDD(
       sparkSession.sparkContext,
       
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
@@ -345,13 +372,26 @@ class HadoopTableReader(
     HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf)
     val inputFormatClass = tableDesc.getInputFileFormatClass
       .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+    createNewHadoopRDD(inputFormatClass, newJobConf)
+  }
+
+  private def createNewHadoopRDD(partDesc: PartitionDesc, path: String): 
RDD[Writable] = {
+    val newJobConf = new JobConf(hadoopConf)
+    HadoopTableReader.initializeLocalJobConfFunc(path, 
partDesc.getTableDesc)(newJobConf)
+    val inputFormatClass = partDesc.getInputFileFormatClass
+      .asInstanceOf[Class[newInputClass[Writable, Writable]]]
+    createNewHadoopRDD(inputFormatClass, newJobConf)
+  }
 
+  private def createNewHadoopRDD(
+      inputFormatClass: Class[newInputClass[Writable, Writable]],
+      jobConf: JobConf): RDD[Writable] = {
     val rdd = new NewHadoopRDD(
       sparkSession.sparkContext,
       inputFormatClass,
       classOf[Writable],
       classOf[Writable],
-      newJobConf
+      jobConf
     )
 
     // Only take the value (skip the key) because Hive works only with values.
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 684b2f2..a84db58 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2614,6 +2614,24 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
       }
     }
   }
+
+  test("SPARK-36197: Use PartitionDesc instead of TableDesc for reading hive 
partitioned tables") {
+    withTempDir { dir =>
+      val t1Loc = s"file:///$dir/t1"
+      val t2Loc = s"file:///$dir/t2"
+      withTable("t1", "t2") {
+        hiveClient.runSqlHive(
+          s"create table t1(id int) partitioned by(pid int) stored as avro 
location '$t1Loc'")
+        hiveClient.runSqlHive("insert into t1 partition(pid=1) select 2")
+        hiveClient.runSqlHive(
+          s"create table t2(id int) partitioned by(pid int) stored as textfile 
location '$t2Loc'")
+        hiveClient.runSqlHive("insert into t2 partition(pid=2) select 2")
+        hiveClient.runSqlHive(s"alter table t1 add partition (pid=2) location 
'$t2Loc/pid=2'")
+        hiveClient.runSqlHive("alter table t1 partition(pid=2) SET FILEFORMAT 
textfile")
+        checkAnswer(sql("select pid, id from t1 order by pid"), Seq(Row(1, 2), 
Row(2, 2)))
+      }
+    }
+  }
 }
 
 @SlowHiveTest

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to