Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4148a9c2c -> 6871deb93


[SPARK-15280] Input/Output] Refactored OrcOutputWriter and moved serialization 
to a new class.

## What changes were proposed in this pull request?
Refactoring: Separated ORC serialization logic from OrcOutputWriter and moved 
to a new class called OrcSerializer.

## How was this patch tested?
Manual tests & existing tests.

Author: Ergin Seyfe <ese...@fb.com>

Closes #13066 from seyfe/orc_serializer.

(cherry picked from commit c18fa464f404ed2612f8c4d355cb0544b355975b)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6871deb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6871deb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6871deb9

Branch: refs/heads/branch-2.0
Commit: 6871deb937fd6d6185b1d2a7a2ea36535ce303ea
Parents: 4148a9c
Author: Ergin Seyfe <ese...@fb.com>
Authored: Sat May 21 16:08:31 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat May 21 16:08:51 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/hive/orc/OrcRelation.scala | 84 +++++++++++---------
 1 file changed, 45 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6871deb9/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 6e55137..38f50c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -149,39 +149,70 @@ private[sql] class DefaultSource
   }
 }
 
-private[orc] class OrcOutputWriter(
-    path: String,
-    bucketId: Option[Int],
-    dataSchema: StructType,
-    context: TaskAttemptContext)
-  extends OutputWriter with HiveInspectors {
+private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
+  extends HiveInspectors {
+
+  def serialize(row: InternalRow): Writable = {
+    wrapOrcStruct(cachedOrcStruct, structOI, row)
+    serializer.serialize(cachedOrcStruct, structOI)
+  }
 
-  private val serializer = {
+  private[this] val serializer = {
     val table = new Properties()
     table.setProperty("columns", dataSchema.fieldNames.mkString(","))
     table.setProperty("columns.types", 
dataSchema.map(_.dataType.catalogString).mkString(":"))
 
     val serde = new OrcSerde
-    val configuration = context.getConfiguration
-    serde.initialize(configuration, table)
+    serde.initialize(conf, table)
     serde
   }
 
-  // Object inspector converted from the schema of the relation to be written.
-  private val structOI = {
+  // Object inspector converted from the schema of the relation to be 
serialized.
+  private[this] val structOI = {
     val typeInfo = 
TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
     OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
       .asInstanceOf[SettableStructObjectInspector]
   }
 
+  private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
+
+  private[this] def wrapOrcStruct(
+      struct: OrcStruct,
+      oi: SettableStructObjectInspector,
+      row: InternalRow): Unit = {
+    val fieldRefs = oi.getAllStructFieldRefs
+    var i = 0
+    while (i < fieldRefs.size) {
+
+      oi.setStructFieldData(
+        struct,
+        fieldRefs.get(i),
+        wrap(
+          row.get(i, dataSchema(i).dataType),
+          fieldRefs.get(i).getFieldObjectInspector,
+          dataSchema(i).dataType))
+      i += 1
+    }
+  }
+}
+
+private[orc] class OrcOutputWriter(
+    path: String,
+    bucketId: Option[Int],
+    dataSchema: StructType,
+    context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private[this] val conf = context.getConfiguration
+
+  private[this] val serializer = new OrcSerializer(dataSchema, conf)
+
   // `OrcRecordWriter.close()` creates an empty file if no rows are written at 
all.  We use this
   // flag to decide whether `OrcRecordWriter.close()` needs to be called.
   private var recordWriterInstantiated = false
 
   private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
     recordWriterInstantiated = true
-
-    val conf = context.getConfiguration
     val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
     val taskAttemptId = context.getTaskAttemptID
     val partition = taskAttemptId.getTaskID.getId
@@ -206,33 +237,8 @@ private[orc] class OrcOutputWriter(
   override def write(row: Row): Unit =
     throw new UnsupportedOperationException("call writeInternal")
 
-  private def wrapOrcStruct(
-      struct: OrcStruct,
-      oi: SettableStructObjectInspector,
-      row: InternalRow): Unit = {
-    val fieldRefs = oi.getAllStructFieldRefs
-    var i = 0
-    while (i < fieldRefs.size) {
-
-      oi.setStructFieldData(
-        struct,
-        fieldRefs.get(i),
-        wrap(
-          row.get(i, dataSchema(i).dataType),
-          fieldRefs.get(i).getFieldObjectInspector,
-          dataSchema(i).dataType))
-      i += 1
-    }
-  }
-
-  val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
-
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    wrapOrcStruct(cachedOrcStruct, structOI, row)
-
-    recordWriter.write(
-      NullWritable.get(),
-      serializer.serialize(cachedOrcStruct, structOI))
+    recordWriter.write(NullWritable.get(), serializer.serialize(row))
   }
 
   override def close(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to