Repository: spark
Updated Branches:
  refs/heads/master 6ceed852a -> 990c9f79c


[SPARK-9170] [SQL] Use OrcStructInspector to be case preserving when writing 
ORC files

JIRA: https://issues.apache.org/jira/browse/SPARK-9170

`StandardStructObjectInspector` will implicitly lowercase column names. But I 
think Orc format doesn't have such requirement. In fact, there is a 
`OrcStructInspector` specified for Orc format. We should use it when serialize 
rows to Orc file. It can be case preserving when writing ORC files.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #7520 from viirya/use_orcstruct.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/990c9f79
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/990c9f79
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/990c9f79

Branch: refs/heads/master
Commit: 990c9f79c28db501018a0a3af446ff879962475d
Parents: 6ceed85
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Tue Sep 8 23:07:34 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Sep 8 23:07:34 2015 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 47 +++++++++++---------
 .../spark/sql/hive/orc/OrcQuerySuite.scala      | 14 ++++++
 2 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/990c9f79/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4eeca9a..7e89109 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -25,9 +25,9 @@ import com.google.common.base.Objects
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, 
OrcSerde, OrcSplit}
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, 
OrcSerde, OrcSplit, OrcStruct}
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
+import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
 import org.apache.hadoop.io.{NullWritable, Writable}
 import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, 
OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter(
       TypeInfoUtils.getTypeInfoFromTypeString(
         HiveMetastoreTypes.toMetastoreType(dataSchema))
 
-    TypeInfoUtils
-      .getStandardJavaObjectInspectorFromTypeInfo(typeInfo)
-      .asInstanceOf[StructObjectInspector]
+    OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
+      .asInstanceOf[SettableStructObjectInspector]
   }
 
-  // Used to hold temporary `Writable` fields of the next row to be written.
-  private val reusableOutputBuffer = new Array[Any](dataSchema.length)
-
-  // Used to convert Catalyst values into Hadoop `Writable`s.
-  private val wrappers = structOI.getAllStructFieldRefs.asScala
-    .zip(dataSchema.fields.map(_.dataType))
-    .map { case (ref, dt) =>
-      wrapperFor(ref.getFieldObjectInspector, dt)
-    }.toArray
-
   // `OrcRecordWriter.close()` creates an empty file if no rows are written at 
all.  We use this
   // flag to decide whether `OrcRecordWriter.close()` needs to be called.
   private var recordWriterInstantiated = false
@@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter(
 
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
-  override protected[sql] def writeInternal(row: InternalRow): Unit = {
+  private def wrapOrcStruct(
+      struct: OrcStruct,
+      oi: SettableStructObjectInspector,
+      row: InternalRow): Unit = {
+    val fieldRefs = oi.getAllStructFieldRefs
     var i = 0
-    while (i < row.numFields) {
-      reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))
+    while (i < fieldRefs.size) {
+      oi.setStructFieldData(
+        struct,
+        fieldRefs.get(i),
+        wrap(
+          row.get(i, dataSchema(i).dataType),
+          fieldRefs.get(i).getFieldObjectInspector,
+          dataSchema(i).dataType))
       i += 1
     }
+  }
+
+  val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = {
+    wrapOrcStruct(cachedOrcStruct, structOI, row)
 
     recordWriter.write(
       NullWritable.get(),
-      serializer.serialize(reusableOutputBuffer, structOI))
+      serializer.serialize(cachedOrcStruct, structOI))
   }
 
   override def close(): Unit = {
@@ -259,7 +264,7 @@ private[orc] case class OrcTableScan(
     maybeStructOI.map { soi =>
       val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
         case (attr, ordinal) =>
-          soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
+          soi.getStructFieldRef(attr.name) -> ordinal
       }.unzip
       val unwrappers = fieldRefs.map(unwrapperFor)
       // Map each tuple to a row object

http://git-wip-us.apache.org/repos/asf/spark/blob/990c9f79/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 744d462..8bc33fc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with 
BeforeAndAfterAll with OrcTest {
     }
   }
 
+  test("SPARK-9170: Don't implicitly lowercase of user-provided columns") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      sqlContext.range(0, 10).select('id as 
"Acol").write.format("orc").save(path)
+      sqlContext.read.format("orc").load(path).schema("Acol")
+      intercept[IllegalArgumentException] {
+        sqlContext.read.format("orc").load(path).schema("acol")
+      }
+      
checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"),
+        (0 until 10).map(Row(_)))
+    }
+  }
+
   test("SPARK-8501: Avoids discovery schema from empty ORC files") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to