spark git commit: [SPARK-13709][SQL] Initialize deserializer with both table and partition properties when reading partitioned tables

2016-06-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3d8d95644 -> 3ccdd6b9c


[SPARK-13709][SQL] Initialize deserializer with both table and partition 
properties when reading partitioned tables

## What changes were proposed in this pull request?

When reading partitions of a partitioned Hive SerDe table, we only initializes 
the deserializer using partition properties. However, for SerDes like 
`AvroSerDe`, essential properties (e.g. Avro schema information) may be defined 
in table properties. We should merge both table properties and partition 
properties before initializing the deserializer.

Note that an individual partition may have different properties than the one 
defined in the table properties (e.g. partitions within a table can have 
different SerDes). Thus, for any property key defined in both partition and 
table properties, the value set in partition properties wins.

## How was this patch tested?

New test case added in `QueryPartitionSuite`.

Author: Cheng Lian 

Closes #13865 from liancheng/spark-13709-partitioned-avro-table.

(cherry picked from commit 2d2f607bfae97f2681df24f48bb8b1b483c6b309)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ccdd6b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ccdd6b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ccdd6b9

Branch: refs/heads/branch-2.0
Commit: 3ccdd6b9c6e63c7498771dcd8673914c46f6794a
Parents: 3d8d956
Author: Cheng Lian 
Authored: Thu Jun 23 23:11:46 2016 -0700
Committer: Yin Huai 
Committed: Thu Jun 23 23:12:24 2016 -0700

--
 .../org/apache/spark/sql/hive/TableReader.scala | 17 +++-
 .../spark/sql/hive/QueryPartitionSuite.scala| 81 
 2 files changed, 97 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ccdd6b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index d044811..e4cb33b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.hive
 
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
@@ -230,10 +234,21 @@ class HadoopTableReader(
   // Fill all partition keys to the given MutableRow object
   fillPartitionKeys(partValues, mutableRow)
 
+  val tableProperties = relation.tableDesc.getProperties
+
   createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
 val hconf = broadcastedHiveConf.value.value
 val deserializer = localDeserializer.newInstance()
-deserializer.initialize(hconf, partProps)
+// SPARK-13709: For SerDes like AvroSerDe, some essential information 
(e.g. Avro schema
+// information) may be defined in table properties. Here we should 
merge table properties
+// and partition properties before initializing the deserializer. Note 
that partition
+// properties take a higher priority here. For example, a partition 
may have a different
+// SerDe as the one defined in table properties.
+val props = new Properties(tableProperties)
+partProps.asScala.foreach {
+  case (key, value) => props.setProperty(key, value)
+}
+deserializer.initialize(hconf, props)
 // get the table deserializer
 val tableSerDe = tableDesc.getDeserializerClass.newInstance()
 tableSerDe.initialize(hconf, tableDesc.getProperties)

http://git-wip-us.apache.org/repos/asf/spark/blob/3ccdd6b9/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index f7650e0..feeaade 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+
 import com.google.common.io.Files
+import org.apache.hadoop.fs.FileSystem
 
 import org.apache.spark.sql._
 import 

spark git commit: [SPARK-13709][SQL] Initialize deserializer with both table and partition properties when reading partitioned tables

2016-06-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master cc6778ee0 -> 2d2f607bf


[SPARK-13709][SQL] Initialize deserializer with both table and partition 
properties when reading partitioned tables

## What changes were proposed in this pull request?

When reading partitions of a partitioned Hive SerDe table, we only initializes 
the deserializer using partition properties. However, for SerDes like 
`AvroSerDe`, essential properties (e.g. Avro schema information) may be defined 
in table properties. We should merge both table properties and partition 
properties before initializing the deserializer.

Note that an individual partition may have different properties than the one 
defined in the table properties (e.g. partitions within a table can have 
different SerDes). Thus, for any property key defined in both partition and 
table properties, the value set in partition properties wins.

## How was this patch tested?

New test case added in `QueryPartitionSuite`.

Author: Cheng Lian 

Closes #13865 from liancheng/spark-13709-partitioned-avro-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d2f607b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d2f607b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d2f607b

Branch: refs/heads/master
Commit: 2d2f607bfae97f2681df24f48bb8b1b483c6b309
Parents: cc6778e
Author: Cheng Lian 
Authored: Thu Jun 23 23:11:46 2016 -0700
Committer: Yin Huai 
Committed: Thu Jun 23 23:11:46 2016 -0700

--
 .../org/apache/spark/sql/hive/TableReader.scala | 17 +++-
 .../spark/sql/hive/QueryPartitionSuite.scala| 81 
 2 files changed, 97 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2d2f607b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index e49a235..b4808fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.hive
 
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
@@ -230,10 +234,21 @@ class HadoopTableReader(
   // Fill all partition keys to the given MutableRow object
   fillPartitionKeys(partValues, mutableRow)
 
+  val tableProperties = relation.tableDesc.getProperties
+
   createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
 val hconf = broadcastedHiveConf.value.value
 val deserializer = localDeserializer.newInstance()
-deserializer.initialize(hconf, partProps)
+// SPARK-13709: For SerDes like AvroSerDe, some essential information 
(e.g. Avro schema
+// information) may be defined in table properties. Here we should 
merge table properties
+// and partition properties before initializing the deserializer. Note 
that partition
+// properties take a higher priority here. For example, a partition 
may have a different
+// SerDe as the one defined in table properties.
+val props = new Properties(tableProperties)
+partProps.asScala.foreach {
+  case (key, value) => props.setProperty(key, value)
+}
+deserializer.initialize(hconf, props)
 // get the table deserializer
 val tableSerDe = tableDesc.getDeserializerClass.newInstance()
 tableSerDe.initialize(hconf, tableDesc.getProperties)

http://git-wip-us.apache.org/repos/asf/spark/blob/2d2f607b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index f7650e0..feeaade 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+
 import com.google.common.io.Files
+import org.apache.hadoop.fs.FileSystem
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -65,4 +68,82 @@ class QueryPartitionSuite extends QueryTest with 
SQLTestUtils with TestHiveSingl
   sql("DROP