Repository: spark Updated Branches: refs/heads/master 6ceed852a -> 990c9f79c
[SPARK-9170] [SQL] Use OrcStructInspector to be case preserving when writing ORC files JIRA: https://issues.apache.org/jira/browse/SPARK-9170 `StandardStructObjectInspector` will implicitly lowercase column names. But I think Orc format doesn't have such requirement. In fact, there is a `OrcStructInspector` specified for Orc format. We should use it when serialize rows to Orc file. It can be case preserving when writing ORC files. Author: Liang-Chi Hsieh <vii...@appier.com> Closes #7520 from viirya/use_orcstruct. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/990c9f79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/990c9f79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/990c9f79 Branch: refs/heads/master Commit: 990c9f79c28db501018a0a3af446ff879962475d Parents: 6ceed85 Author: Liang-Chi Hsieh <vii...@appier.com> Authored: Tue Sep 8 23:07:34 2015 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Tue Sep 8 23:07:34 2015 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/hive/orc/OrcRelation.scala | 47 +++++++++++--------- .../spark/sql/hive/orc/OrcQuerySuite.scala | 14 ++++++ 2 files changed, 40 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/990c9f79/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 4eeca9a..7e89109 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -25,9 +25,9 @@ import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit} -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct} +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector +import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat @@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter( TypeInfoUtils.getTypeInfoFromTypeString( HiveMetastoreTypes.toMetastoreType(dataSchema)) - TypeInfoUtils - .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) - .asInstanceOf[StructObjectInspector] + OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) + .asInstanceOf[SettableStructObjectInspector] } - // Used to hold temporary `Writable` fields of the next row to be written. - private val reusableOutputBuffer = new Array[Any](dataSchema.length) - - // Used to convert Catalyst values into Hadoop `Writable`s. - private val wrappers = structOI.getAllStructFieldRefs.asScala - .zip(dataSchema.fields.map(_.dataType)) - .map { case (ref, dt) => - wrapperFor(ref.getFieldObjectInspector, dt) - }.toArray - // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // flag to decide whether `OrcRecordWriter.close()` needs to be called. private var recordWriterInstantiated = false @@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - override protected[sql] def writeInternal(row: InternalRow): Unit = { + private def wrapOrcStruct( + struct: OrcStruct, + oi: SettableStructObjectInspector, + row: InternalRow): Unit = { + val fieldRefs = oi.getAllStructFieldRefs var i = 0 - while (i < row.numFields) { - reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType)) + while (i < fieldRefs.size) { + oi.setStructFieldData( + struct, + fieldRefs.get(i), + wrap( + row.get(i, dataSchema(i).dataType), + fieldRefs.get(i).getFieldObjectInspector, + dataSchema(i).dataType)) i += 1 } + } + + val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] + + override protected[sql] def writeInternal(row: InternalRow): Unit = { + wrapOrcStruct(cachedOrcStruct, structOI, row) recordWriter.write( NullWritable.get(), - serializer.serialize(reusableOutputBuffer, structOI)) + serializer.serialize(cachedOrcStruct, structOI)) } override def close(): Unit = { @@ -259,7 +264,7 @@ private[orc] case class OrcTableScan( maybeStructOI.map { soi => val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => - soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal + soi.getStructFieldRef(attr.name) -> ordinal }.unzip val unwrappers = fieldRefs.map(unwrapperFor) // Map each tuple to a row object http://git-wip-us.apache.org/repos/asf/spark/blob/990c9f79/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 744d462..8bc33fc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("SPARK-9170: Don't implicitly lowercase of user-provided columns") { + withTempPath { dir => + val path = dir.getCanonicalPath + + sqlContext.range(0, 10).select('id as "Acol").write.format("orc").save(path) + sqlContext.read.format("orc").load(path).schema("Acol") + intercept[IllegalArgumentException] { + sqlContext.read.format("orc").load(path).schema("acol") + } + checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"), + (0 until 10).map(Row(_))) + } + } + test("SPARK-8501: Avoids discovery schema from empty ORC files") { withTempPath { dir => val path = dir.getCanonicalPath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org