This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new d0c4d22 [SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for reading hive partitioned tables d0c4d22 is described below commit d0c4d224e0bf8d13ccc36882169e34603e7a6d39 Author: Kent Yao <y...@apache.org> AuthorDate: Mon Jul 19 15:59:36 2021 +0800 [SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for reading hive partitioned tables ### What changes were proposed in this pull request? A hive partition can have different `PartitionDesc`s from `TableDesc` for describing Serde/InputFormatClass/OutputFormatClass, for a hive partitioned table, we shall respect those in `PartitionDesc`. ### Why are the changes needed? in many cases, that Spark reads hive tables could result in surprise because of this issue. ### Does this PR introduce _any_ user-facing change? yes, hive partition table that contains different serde/input/output could be recognized by Spark ### How was this patch tested? new test added Closes #33406 from yaooqinn/SPARK-36197. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit ef803566144ba26b124ddff1d3a3fcdf95678317) Signed-off-by: Kent Yao <y...@apache.org> --- .../org/apache/spark/sql/hive/TableReader.scala | 48 ++++++++++++++++++++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 18 ++++++++ 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 96949a3..a048026 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} -import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.plan.{PartitionDesc, TableDesc} import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AvroTableProperties import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} @@ -201,7 +201,7 @@ class HadoopTableReader( val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer) .map { case (partition, partDeserializer) => - val partDesc = Utilities.getPartitionDesc(partition) + val partDesc = Utilities.getPartitionDescFromTableDesc(tableDesc, partition, true) val partPath = partition.getDataLocation val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) // Get partition field info @@ -245,7 +245,8 @@ class HadoopTableReader( // Create local references so that the outer object isn't serialized. val localTableDesc = tableDesc - createHadoopRDD(localTableDesc, inputPathStr).mapPartitions { iter => + + createHadoopRDD(partDesc, inputPathStr).mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.getConstructor().newInstance() // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema @@ -314,6 +315,15 @@ class HadoopTableReader( } } + private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: String): RDD[Writable] = { + val inputFormatClazz = partitionDesc.getInputFileFormatClass + if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + createNewHadoopRDD(partitionDesc, inputPathStr) + } else { + createOldHadoopRDD(partitionDesc, inputPathStr) + } + } + /** * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * applied locally on each executor. @@ -322,7 +332,24 @@ class HadoopTableReader( val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val inputFormatClass = tableDesc.getInputFileFormatClass .asInstanceOf[Class[oldInputClass[Writable, Writable]]] + createOldHadoopRDD(inputFormatClass, initializeJobConfFunc) + } + /** + * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be + * applied locally on each executor. + */ + private def createOldHadoopRDD(partitionDesc: PartitionDesc, path: String): RDD[Writable] = { + val initializeJobConfFunc = + HadoopTableReader.initializeLocalJobConfFunc(path, partitionDesc.getTableDesc) _ + val inputFormatClass = partitionDesc.getInputFileFormatClass + .asInstanceOf[Class[oldInputClass[Writable, Writable]]] + createOldHadoopRDD(inputFormatClass, initializeJobConfFunc) + } + + private def createOldHadoopRDD( + inputFormatClass: Class[oldInputClass[Writable, Writable]], + initializeJobConfFunc: JobConf => Unit): RDD[Writable] = { val rdd = new HadoopRDD( sparkSession.sparkContext, _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], @@ -345,13 +372,26 @@ class HadoopTableReader( HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf) val inputFormatClass = tableDesc.getInputFileFormatClass .asInstanceOf[Class[newInputClass[Writable, Writable]]] + createNewHadoopRDD(inputFormatClass, newJobConf) + } + + private def createNewHadoopRDD(partDesc: PartitionDesc, path: String): RDD[Writable] = { + val newJobConf = new JobConf(hadoopConf) + HadoopTableReader.initializeLocalJobConfFunc(path, partDesc.getTableDesc)(newJobConf) + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + createNewHadoopRDD(inputFormatClass, newJobConf) + } + private def createNewHadoopRDD( + inputFormatClass: Class[newInputClass[Writable, Writable]], + jobConf: JobConf): RDD[Writable] = { val rdd = new NewHadoopRDD( sparkSession.sparkContext, inputFormatClass, classOf[Writable], classOf[Writable], - newJobConf + jobConf ) // Only take the value (skip the key) because Hive works only with values. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 684b2f2..a84db58 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2614,6 +2614,24 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi } } } + + test("SPARK-36197: Use PartitionDesc instead of TableDesc for reading hive partitioned tables") { + withTempDir { dir => + val t1Loc = s"file:///$dir/t1" + val t2Loc = s"file:///$dir/t2" + withTable("t1", "t2") { + hiveClient.runSqlHive( + s"create table t1(id int) partitioned by(pid int) stored as avro location '$t1Loc'") + hiveClient.runSqlHive("insert into t1 partition(pid=1) select 2") + hiveClient.runSqlHive( + s"create table t2(id int) partitioned by(pid int) stored as textfile location '$t2Loc'") + hiveClient.runSqlHive("insert into t2 partition(pid=2) select 2") + hiveClient.runSqlHive(s"alter table t1 add partition (pid=2) location '$t2Loc/pid=2'") + hiveClient.runSqlHive("alter table t1 partition(pid=2) SET FILEFORMAT textfile") + checkAnswer(sql("select pid, id from t1 order by pid"), Seq(Row(1, 2), Row(2, 2))) + } + } + } } @SlowHiveTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org