msamirkhan commented on a change in pull request #29353:
URL: https://github.com/apache/spark/pull/29353#discussion_r465860164



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcSerializer.scala
##########
@@ -150,69 +156,110 @@ class OrcSerializer(dataSchema: StructType) {
     case DecimalType.Fixed(precision, scale) =>
       OrcShimUtils.getHiveDecimalWritable(precision, scale)
 
-    case st: StructType => (getter, ordinal) =>
-      val result = createOrcValue(st).asInstanceOf[OrcStruct]
-      val fieldConverters = st.map(_.dataType).map(newConverter(_))
-      val numFields = st.length
-      val struct = getter.getStruct(ordinal, numFields)
-      var i = 0
-      while (i < numFields) {
-        if (struct.isNullAt(i)) {
-          result.setFieldValue(i, null)
-        } else {
-          result.setFieldValue(i, fieldConverters(i)(struct, i))
+    case st: StructType =>
+      val orcSchema = schemaConverter(st)
+      val fieldConverters = st.map(_.dataType).map(newConverter(_, reuseObj))
+      val baseConverter: (OrcStruct, SpecializedGetters, Int) => OrcStruct =
+        (result, getter, ordinal) => {
+          val numFields = st.length
+          val struct = getter.getStruct(ordinal, numFields)
+          var i = 0
+          while (i < numFields) {
+            if (struct.isNullAt(i)) {
+              result.setFieldValue(i, null)
+            } else {
+              result.setFieldValue(i, fieldConverters(i)(struct, i))
+            }
+            i += 1
+          }
+          result
         }
-        i += 1
-      }
-      result
 
-    case ArrayType(elementType, _) => (getter, ordinal) =>
-      val result = 
createOrcValue(dataType).asInstanceOf[OrcList[WritableComparable[_]]]
+//      if (reuseObj) {
+//        val result = new OrcStruct(orcSchema)
+//        (getter, ordinal) => baseConverter.apply(result, getter, ordinal)
+//      } else {
+        (getter, ordinal) =>
+          val result = new OrcStruct(orcSchema)
+          baseConverter.apply(result, getter, ordinal)
+//      }
+
+    case ArrayType(elementType, _) =>
+      val orcSchema = schemaConverter(dataType)
       // Need to put all converted values to a list, can't reuse object.
-      val elementConverter = newConverter(elementType, reuseObj = false)
-      val array = getter.getArray(ordinal)
-      var i = 0
-      while (i < array.numElements()) {
-        if (array.isNullAt(i)) {
-          result.add(null)
-        } else {
-          result.add(elementConverter(array, i))
+      val elementConverter = newConverter(elementType, false)
+      val baseArrayConverter: (OrcList[WritableComparable[_]], 
SpecializedGetters, Int) => Unit = {
+        (result, getter, ordinal) => {
+          val array = getter.getArray(ordinal)
+          var i = 0
+          while (i < array.numElements()) {
+            if (array.isNullAt(i)) {
+              result.add(null)
+            } else {
+              result.add(elementConverter(array, i))
+            }
+            i += 1
+          }
         }
-        i += 1
       }
-      result
 
-    case MapType(keyType, valueType, _) => (getter, ordinal) =>
-      val result = createOrcValue(dataType)
-        .asInstanceOf[OrcMap[WritableComparable[_], WritableComparable[_]]]
+//      if (reuseObj) {
+//        val result = new OrcList[WritableComparable[_]](orcSchema)
+//        (getter, ordinal) => {
+//          result.clear()
+//          baseArrayConverter.apply(result, getter, ordinal)
+//          result
+//        }
+//      } else {
+        (getter, ordinal) => {
+          val result = new OrcList[WritableComparable[_]](orcSchema)
+          baseArrayConverter.apply(result, getter, ordinal)
+          result
+        }
+//      }
+
+    case MapType(keyType, valueType, _) =>
+      val orcSchema = schemaConverter(dataType)
       // Need to put all converted values to a list, can't reuse object.
-      val keyConverter = newConverter(keyType, reuseObj = false)
-      val valueConverter = newConverter(valueType, reuseObj = false)
-      val map = getter.getMap(ordinal)
-      val keyArray = map.keyArray()
-      val valueArray = map.valueArray()
-      var i = 0
-      while (i < map.numElements()) {
-        val key = keyConverter(keyArray, i)
-        if (valueArray.isNullAt(i)) {
-          result.put(key, null)
-        } else {
-          result.put(key, valueConverter(valueArray, i))
+      val keyConverter = newConverter(keyType, false)
+      val valueConverter = newConverter(valueType, false)
+      val baseMapConverter:
+        (OrcMap[WritableComparable[_], WritableComparable[_]], 
SpecializedGetters, Int) => Unit = {
+        (result, getter, ordinal) => {
+          val map = getter.getMap(ordinal)
+          val keyArray = map.keyArray()
+          val valueArray = map.valueArray()
+          var i = 0
+          while (i < map.numElements()) {
+            val key = keyConverter(keyArray, i)
+            if (valueArray.isNullAt(i)) {
+              result.put(key, null)
+            } else {
+              result.put(key, valueConverter(valueArray, i))
+            }
+            i += 1
+          }
         }
-        i += 1
       }
-      result
 
-    case udt: UserDefinedType[_] => newConverter(udt.sqlType)
+//      if (reuseObj) {
+//        val result = new OrcMap[WritableComparable[_], 
WritableComparable[_]](orcSchema)
+//        (getter, ordinal) => {
+//          result.clear()
+//          baseMapConverter.apply(result, getter, ordinal)
+//          result
+//        }
+//      } else {
+        (getter, ordinal) => {
+          val result = new OrcMap[WritableComparable[_], 
WritableComparable[_]](orcSchema)
+          baseMapConverter.apply(result, getter, ordinal)
+          result
+        }
+//      }
+
+    case udt: UserDefinedType[_] => newConverter(udt.sqlType, reuseObj)
 
     case _ =>
       throw new UnsupportedOperationException(s"$dataType is not supported 
yet.")
   }
-
-  /**
-   * Return a Orc value object for the given Spark schema.
-   */
-  private def createOrcValue(dataType: DataType) = {
-    
OrcStruct.createValue(TypeDescription.fromString(OrcFileFormat.getQuotedSchemaString(dataType)))
-  }

Review comment:
       From the profiler, this was a prominent hotspot. There are two problems 
here.
   1) Converts dataType => String => orc type description for each data point. 
Instead can be done only once at the schema level (i.e. at initialization 
stage) with the new method schemaConverter
   2) OrcStruct.createValue 
(https://github.com/apache/orc/blob/master/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java)
 for a struct type would recursively go down the fields and create relevant 
objects until it hits either a map, array, union, or a primitive type. We 
weren't actually using these child objects here, so now with the change it will 
only create the one OrcStruct object that is needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to