Repository: spark
Updated Branches:
  refs/heads/master 8c3b0052f -> e6d1406ab


[SPARK-5498][SQL]fix query exception when partition schema does not match table 
schema

In hive,the schema of partition may be difference from  the table schema.When 
we use spark-sql to query the data of partition which schema is difference from 
the table schema,we will get the exceptions as the description of the 
[jira](https://issues.apache.org/jira/browse/SPARK-5498) .For example:
* We take a look of the schema for the partition and the table

```sql
DESCRIBE partition_test PARTITION (dt='1');
id                      int                     None
name                    string                  None
dt                      string                  None

# Partition Information
# col_name              data_type               comment

dt                      string                  None
```
```
DESCRIBE partition_test;
OK
id                      bigint                  None
name                    string                  None
dt                      string                  None

# Partition Information
# col_name              data_type               comment

dt                      string                  None
```
*  run the sql
```sql
SELECT * FROM partition_test where dt='1';
```
we will get the cast exception `java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.MutableLong cannot be cast to 
org.apache.spark.sql.catalyst.expressions.MutableInt`

Author: jeanlyn <jeanly...@gmail.com>

Closes #4289 from jeanlyn/schema and squashes the following commits:

9c8da74 [jeanlyn] fix style
b41d6b9 [jeanlyn] fix compile errors
07d84b6 [jeanlyn] Merge branch 'master' into schema
535b0b6 [jeanlyn] reduce conflicts
d6c93c5 [jeanlyn] fix bug
1e8b30c [jeanlyn] fix code style
0549759 [jeanlyn] fix code style
c879aa1 [jeanlyn] clean the code
2a91a87 [jeanlyn] add more test case and clean the code
12d800d [jeanlyn] fix code style
63d170a [jeanlyn] fix compile problem
7470901 [jeanlyn] reduce conflicts
afc7da5 [jeanlyn] make getConvertedOI compatible between 0.12.0 and 0.13.1
b1527d5 [jeanlyn] fix type mismatch
10744ca [jeanlyn] Insert a space after the start of the comment
3b27af3 [jeanlyn] SPARK-5498:fix bug when query the data when partition schema 
does not match table schema


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6d1406a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6d1406a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6d1406a

Branch: refs/heads/master
Commit: e6d1406abd55bc24477eb8c6ee72c31e7110435e
Parents: 8c3b005
Author: jeanlyn <jeanly...@gmail.com>
Authored: Wed Mar 25 17:47:45 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Mar 25 17:47:45 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/TableReader.scala | 37 ++++++++++++-----
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 42 ++++++++++++++++++++
 .../org/apache/spark/sql/hive/Shim12.scala      | 10 ++++-
 .../org/apache/spark/sql/hive/Shim13.scala      |  9 ++++-
 4 files changed, 84 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e6d1406a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index af309c0..3563472 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table 
=> HiveTable}
 import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
 import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, 
StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -116,7 +116,7 @@ class HadoopTableReader(
       val hconf = broadcastedHiveConf.value.value
       val deserializer = deserializerClass.newInstance()
       deserializer.initialize(hconf, tableDesc.getProperties)
-      HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, 
mutableRow)
+      HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, 
mutableRow, deserializer)
     }
 
     deserializedHadoopRDD
@@ -189,9 +189,13 @@ class HadoopTableReader(
         val hconf = broadcastedHiveConf.value.value
         val deserializer = localDeserializer.newInstance()
         deserializer.initialize(hconf, partProps)
+        // get the table deserializer
+        val tableSerDe = tableDesc.getDeserializerClass.newInstance()
+        tableSerDe.initialize(hconf, tableDesc.getProperties)
 
         // fill the non partition key attributes
-        HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, 
mutableRow)
+        HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
+          mutableRow, tableSerDe)
       }
     }.toSeq
 
@@ -261,25 +265,36 @@ private[hive] object HadoopTableReader extends 
HiveInspectors {
    * Transform all given raw `Writable`s into `Row`s.
    *
    * @param iterator Iterator of all `Writable`s to be transformed
-   * @param deserializer The `Deserializer` associated with the input 
`Writable`
+   * @param rawDeser The `Deserializer` associated with the input `Writable`
    * @param nonPartitionKeyAttrs Attributes that should be filled together 
with their corresponding
    *                             positions in the output schema
    * @param mutableRow A reusable `MutableRow` that should be filled
+   * @param tableDeser Table Deserializer
    * @return An `Iterator[Row]` transformed from `iterator`
    */
   def fillObject(
       iterator: Iterator[Writable],
-      deserializer: Deserializer,
+      rawDeser: Deserializer,
       nonPartitionKeyAttrs: Seq[(Attribute, Int)],
-      mutableRow: MutableRow): Iterator[Row] = {
+      mutableRow: MutableRow,
+      tableDeser: Deserializer): Iterator[Row] = {
+
+    val soi = if 
(rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
+      rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
+    } else {
+      HiveShim.getConvertedOI(
+        rawDeser.getObjectInspector,
+        tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
+    }
 
-    val soi = 
deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
     val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, 
ordinal) =>
       soi.getStructFieldRef(attr.name) -> ordinal
     }.unzip
 
-    // Builds specific unwrappers ahead of time according to object inspector 
types to avoid pattern
-    // matching and branching costs per row.
+    /**
+     * Builds specific unwrappers ahead of time according to object inspector
+     * types to avoid pattern matching and branching costs per row.
+     */
     val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
       _.getFieldObjectInspector match {
         case oi: BooleanObjectInspector =>
@@ -316,9 +331,11 @@ private[hive] object HadoopTableReader extends 
HiveInspectors {
       }
     }
 
+    val converter = 
ObjectInspectorConverters.getConverter(rawDeser.getObjectInspector, soi)
+
     // Map each tuple to a row object
     iterator.map { value =>
-      val raw = deserializer.deserialize(value)
+      val raw = converter.convert(rawDeser.deserialize(value))
       var i = 0
       while (i < fieldRefs.length) {
         val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))

http://git-wip-us.apache.org/repos/asf/spark/blob/e6d1406a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 381cd2a..aa6fb42 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -32,9 +32,12 @@ import org.apache.spark.sql.hive.test.TestHive._
 
 case class TestData(key: Int, value: String)
 
+case class ThreeCloumntable(key: Int, value: String, key1: String)
+
 class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
   import org.apache.spark.sql.hive.test.TestHive.implicits._
 
+
   val testData = TestHive.sparkContext.parallelize(
     (1 to 100).map(i => TestData(i, i.toString))).toDF()
 
@@ -186,4 +189,43 @@ class InsertIntoHiveTableSuite extends QueryTest with 
BeforeAndAfter {
 
     sql("DROP TABLE hiveTableWithStructValue")
   }
+
+  test("SPARK-5498:partition schema does not match table schema") {
+    val testData = TestHive.sparkContext.parallelize(
+      (1 to 10).map(i => TestData(i, i.toString))).toDF()
+    testData.registerTempTable("testData")
+
+    val testDatawithNull = TestHive.sparkContext.parallelize(
+      (1 to 10).map(i => ThreeCloumntable(i, i.toString,null))).toDF()
+
+    val tmpDir = Files.createTempDir()
+    sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED 
by (ds string) location '${tmpDir.toURI.toString}' ")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='1') 
SELECT key,value FROM testData")
+
+    // test schema the same between partition and table
+    sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
+    checkAnswer(sql("select key,value from table_with_partition where ds='1' 
"),
+      testData.collect.toSeq
+    )
+    
+    // test difference type of field
+    sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
+    checkAnswer(sql("select key,value from table_with_partition where ds='1' 
"),
+      testData.collect.toSeq
+    )
+
+    // add column to table
+    sql("ALTER TABLE table_with_partition ADD COLUMNS(key1 string)")
+    checkAnswer(sql("select key,value,key1 from table_with_partition where 
ds='1' "),
+      testDatawithNull.collect.toSeq
+    )
+
+    // change column name to table
+    sql("ALTER TABLE table_with_partition CHANGE COLUMN key keynew BIGINT")
+    checkAnswer(sql("select keynew,value from table_with_partition where 
ds='1' "),
+      testData.collect.toSeq
+    )
+
+    sql("DROP TABLE table_with_partition")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e6d1406a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala 
b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 30646dd..0ed93c2 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, 
FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io 
=> hiveIo}
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
PrimitiveObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, 
ObjectInspector, PrimitiveObjectInspector}
 import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector,
 PrimitiveObjectInspectorFactory}
 import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
@@ -210,7 +210,7 @@ private[hive] object HiveShim {
 
   def getDataLocationPath(p: Partition) = p.getPartitionPath
 
-  def getAllPartitionsOf(client: Hive, tbl: Table) =  
client.getAllPartitionsForPruner(tbl)
+  def getAllPartitionsOf(client: Hive, tbl: Table) = 
client.getAllPartitionsForPruner(tbl)
 
   def compatibilityBlackList = Seq(
     "decimal_.*",
@@ -244,6 +244,12 @@ private[hive] object HiveShim {
     }
   }
 
+  def getConvertedOI(
+      inputOI: ObjectInspector,
+      outputOI: ObjectInspector): ObjectInspector = {
+    ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
+  }
+
   def prepareWritable(w: Writable): Writable = {
     w
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e6d1406a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala 
b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index f9fcbda..7577309 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import java.util
 import java.util.{ArrayList => JArrayList}
 import java.util.Properties
 import java.rmi.server.UID
@@ -38,7 +39,7 @@ import 
org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, DecimalTypeInfo, 
TypeInfoFactory}
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector,
 PrimitiveObjectInspectorFactory}
-import 
org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, 
ObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, 
PrimitiveObjectInspector, ObjectInspector}
 import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
 import org.apache.hadoop.hive.serde2.{io => hiveIo}
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
@@ -400,7 +401,11 @@ private[hive] object HiveShim {
       Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), 
hdoi.precision(), hdoi.scale())
     }
   }
- 
+
+  def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): 
ObjectInspector = {
+    ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
+  }
+
   /*
    * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member 
recordReaderID that
    * is needed to initialize before serialization.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to