msamirkhan commented on a change in pull request #29354:
URL: https://github.com/apache/spark/pull/29354#discussion_r465980330



##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
##########
@@ -40,6 +40,8 @@ private[sql] class AvroOutputWriterFactory(
       path: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter = {
+    context.getConfiguration.set("avro.serialization.data.model",
+      "org.apache.avro.generic.GenericData")

Review comment:
       This is a simple change to use GenericDatumReader in the avro lib 
instead of the ReflectDatumReader (which is the default). Write time 
improvements are in the attached 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf), pg 3 
under column A.

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
##########
@@ -367,15 +372,45 @@ class AvroDeserializer(
     }
   }
 
-  private def createArrayData(elementType: DataType, length: Int): ArrayData = 
elementType match {
-    case BooleanType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Boolean](length))
-    case ByteType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Byte](length))
-    case ShortType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Short](length))
-    case IntegerType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Int](length))
-    case LongType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Long](length))
-    case FloatType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Float](length))
-    case DoubleType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Double](length))
-    case _ => new GenericArrayData(new Array[Any](length))
+  private def getArrayDataCreator(elementType: DataType): Int => ArrayData = 
elementType match {
+    case BooleanType => length => UnsafeArrayData.createFreshArray(length, 1)
+    case ByteType => length => UnsafeArrayData.createFreshArray(length, 1)
+    case ShortType => length => UnsafeArrayData.createFreshArray(length, 2)
+    case IntegerType => length => UnsafeArrayData.createFreshArray(length, 4)
+    case LongType => length => UnsafeArrayData.createFreshArray(length, 8)
+    case FloatType => length => UnsafeArrayData.createFreshArray(length, 4)
+    case DoubleType => length => UnsafeArrayData.createFreshArray(length, 8)
+    case _ => length => new GenericArrayData(new Array[Any](length))
+  }
+
+  private def getRowCreator(st: StructType): () => InternalRow = {
+    val constructorsArray = new Array[Unit => MutableValue](st.fields.length)
+    var i = 0
+    while (i < st.fields.length) {
+      st.fields(i).dataType match {
+        case BooleanType => constructorsArray(i) = _ => new MutableBoolean
+        case ByteType => constructorsArray(i) = _ => new MutableByte
+        case ShortType => constructorsArray(i) = _ => new MutableShort
+        // We use INT for DATE internally
+        case IntegerType | DateType => constructorsArray(i) = _ => new 
MutableInt
+        // We use Long for Timestamp internally
+        case LongType | TimestampType => constructorsArray(i) = _ => new 
MutableLong
+        case FloatType => constructorsArray(i) = _ => new MutableFloat
+        case DoubleType => constructorsArray(i) = _ => new MutableDouble
+        case _ => constructorsArray(i) = _ => new MutableAny
+      }
+      i += 1
+    }
+
+    () => {
+      val array = new Array[MutableValue](constructorsArray.length)
+      var i = 0
+      while (i < constructorsArray.length) {
+        array(i) = constructorsArray(i)(Unit)
+        i += 1
+      }
+      new SpecificInternalRow(array)
+    }

Review comment:
       The profiler showed some time being spent in SpecificInternalRow 
constructor, and we saw improvements when moving to this model where based on 
the schema we can fill in a constructors array and for each data point, call 
these constructors one by one. In retrospect, changes can instead be made to 
the SpecificInternalRow constructor which will benefit 
https://github.com/apache/spark/pull/29353 as well. So this has been reverted 
in a latter commit. Read time improvements can be found in the 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf) 
attached to PR in pg 2 under column B.
   
   The changes to SpecificInternalRow constructor can be found here: 
https://github.com/apache/spark/pull/29353#issuecomment-669459288

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.IOException
+import java.math.BigDecimal
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericDatumReader, GenericFixed}
+import org.apache.avro.io.ResolvingDecoder
+import org.apache.avro.util.Utf8
+
+import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
+import org.apache.spark.sql.catalyst.expressions.{MutableAny, MutableBoolean, 
MutableByte,
+  MutableDouble, MutableFloat, MutableInt, MutableLong, MutableShort, 
MutableValue,
+  SpecificInternalRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
DateTimeUtils,
+  GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Reader to read data as catalyst format from avro decoders. Multiple calls
+ * return the same [[InternalRow]] object. If needed, caller should copy 
before making a new call.
+ *
+ * @param actual
+ * @param expected
+ * @param rootCatalystType
+ * @param datetimeRebaseMode
+ * @tparam T
+ */
+class SparkAvroDatumReader[T](

Review comment:
       This commit moves to using native reader for avro files. Previously we 
had Decoder => GenericAvroDatumReader => AvroDeserializer. Now 
SparkAvroDatumReader reads directly from the Decoder. Array reading performance 
degraded, which has been fixed in a latter commit. Moving to the native reader 
also allows us to "skip" reading columns that are not needed. This change is 
also in a latter commit.
   
   Read times can be found in the attached 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf) on pg 2 
under column D.

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumWriter.scala
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.nio.ByteBuffer
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
+import org.apache.avro.generic.GenericDatumWriter
+import org.apache.avro.io.Encoder
+import org.apache.avro.util.Utf8
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, 
SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.types._
+
+/**
+ * Writer to write data in catalyst format to avro encoders
+ *
+ * @param rootAvroType
+ * @param rootCatalystType
+ * @param nullable
+ * @tparam D
+ */
+class SparkAvroDatumWriter[D](rootAvroType: Schema, rootCatalystType: 
DataType, nullable: Boolean)

Review comment:
       This commit moves to using native writer for avro files. Previously we 
had AvroSerializer => GenericAvroDatumWriter => Encoder. Now 
SparkAvroDatumWriter writes directly to the Encoder. Eg, previously we would 
first fill up a HashMap in AvroSerializer and then GenericAvroDatumWriter 
(ReflectAvroDatumWriter before the first commit in this PR) will write it out 
to the Encoder. With this change, we write it out directly to the Encoder. 
Similarly resolving unions does not need to happen twice anymore.
   
   Write time improvements can be found in the attached 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf) on pg 3 
under column D.

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -452,71 +452,73 @@ class SparkAvroDatumReader[T](
 
   private[this] def getArrayReader(avroType: Schema,
       elementType: DataType,
-      path: List[String],
-      reuseObj: Boolean
+      path: List[String]
   ): (CatalystDataUpdater, Int, ResolvingDecoder) => Unit = {
     val elementReader = newReader(avroType.getElementType, elementType, path, 
false)
-    val array = new ArrayBuffer[Any]
-    val arrayUpdater = new ArrayBufferUpdater(array)
-    val toArrayConverter = getToArrayDataConverter(elementType, array)
+    val arrayCreator = getArrayDataCreator(elementType)
+    val arrayExpander = getArrayDataExpander(elementType)
 
     (updater, ordinal, in) => {
       var length = in.readArrayStart()
-      array.sizeHint(length.toInt)
+      var array = arrayCreator(length) // if (length == 0) 0 else 1)
+      val arrayUpdater = new ArrayDataUpdater(array)
 
+      var base: Int = 0
       while (length > 0) {
         var i = 0
         while (i < length) {
-          elementReader(arrayUpdater, i, in)
+          elementReader(arrayUpdater, base + i, in)
           i += 1
+          // array = arrayExpander(arrayUpdater, if (i == length) 0 else 1)
         }
+        base += length.toInt
         length = in.arrayNext()
-        array.sizeHint((array.length + length).toInt)
+        array = arrayExpander(arrayUpdater, length) // if (length == 0) 0 else 
1)
       }
 
-      updater.set(ordinal, toArrayConverter())
-      array.clear()
+      updater.set(ordinal, array)
     }
   }

Review comment:
       In this commit arrays are read directly to ArrayData. This requires the 
ability to "expand" GenericArrayData and UnsafeArrayData. Added one method to 
UnsafeArrayData 
https://github.com/apache/spark/pull/29354/commits/2c62ac9ae960582058e96860002f7768eebb95f2#r465994024
 
   
   Read time improvements can be found in the benchmarks 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf) on pg 2 
under column G.

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -452,71 +452,73 @@ class SparkAvroDatumReader[T](
 
   private[this] def getArrayReader(avroType: Schema,
       elementType: DataType,
-      path: List[String],
-      reuseObj: Boolean
+      path: List[String]
   ): (CatalystDataUpdater, Int, ResolvingDecoder) => Unit = {
     val elementReader = newReader(avroType.getElementType, elementType, path, 
false)
-    val array = new ArrayBuffer[Any]
-    val arrayUpdater = new ArrayBufferUpdater(array)
-    val toArrayConverter = getToArrayDataConverter(elementType, array)
+    val arrayCreator = getArrayDataCreator(elementType)
+    val arrayExpander = getArrayDataExpander(elementType)
 
     (updater, ordinal, in) => {
       var length = in.readArrayStart()
-      array.sizeHint(length.toInt)
+      var array = arrayCreator(length) // if (length == 0) 0 else 1)
+      val arrayUpdater = new ArrayDataUpdater(array)
 
+      var base: Int = 0
       while (length > 0) {
         var i = 0
         while (i < length) {
-          elementReader(arrayUpdater, i, in)
+          elementReader(arrayUpdater, base + i, in)
           i += 1
+          // array = arrayExpander(arrayUpdater, if (i == length) 0 else 1)
         }
+        base += length.toInt
         length = in.arrayNext()
-        array.sizeHint((array.length + length).toInt)
+        array = arrayExpander(arrayUpdater, length) // if (length == 0) 0 else 
1)
       }
 
-      updater.set(ordinal, toArrayConverter())
-      array.clear()
+      updater.set(ordinal, array)
     }
   }

Review comment:
       In this commit arrays are read directly to ArrayData. This requires the 
ability to "expand" GenericArrayData and UnsafeArrayData. Added one method to 
UnsafeArrayData 
https://github.com/apache/spark/pull/29354/commits/2c62ac9ae960582058e96860002f7768eebb95f2#r465994024
 (the url does not seem to work but it is to the comment in UnsafeArrayData 
below)
   
   Read time improvements can be found in the benchmarks 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf) on pg 2 
under column G.

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -452,71 +452,73 @@ class SparkAvroDatumReader[T](
 
   private[this] def getArrayReader(avroType: Schema,
       elementType: DataType,
-      path: List[String],
-      reuseObj: Boolean
+      path: List[String]
   ): (CatalystDataUpdater, Int, ResolvingDecoder) => Unit = {
     val elementReader = newReader(avroType.getElementType, elementType, path, 
false)
-    val array = new ArrayBuffer[Any]
-    val arrayUpdater = new ArrayBufferUpdater(array)
-    val toArrayConverter = getToArrayDataConverter(elementType, array)
+    val arrayCreator = getArrayDataCreator(elementType)
+    val arrayExpander = getArrayDataExpander(elementType)
 
     (updater, ordinal, in) => {
       var length = in.readArrayStart()
-      array.sizeHint(length.toInt)
+      var array = arrayCreator(length) // if (length == 0) 0 else 1)
+      val arrayUpdater = new ArrayDataUpdater(array)
 
+      var base: Int = 0
       while (length > 0) {
         var i = 0
         while (i < length) {
-          elementReader(arrayUpdater, i, in)
+          elementReader(arrayUpdater, base + i, in)
           i += 1
+          // array = arrayExpander(arrayUpdater, if (i == length) 0 else 1)
         }
+        base += length.toInt
         length = in.arrayNext()
-        array.sizeHint((array.length + length).toInt)
+        array = arrayExpander(arrayUpdater, length) // if (length == 0) 0 else 
1)
       }
 
-      updater.set(ordinal, toArrayConverter())
-      array.clear()
+      updater.set(ordinal, array)
     }
   }
 
   private[this] def getMapReader(avroType: Schema,
       keyType: DataType,
       valueType: DataType,
-      path: List[String],
-      reuseObj: Boolean
+      path: List[String]
   ): (CatalystDataUpdater, Int, ResolvingDecoder) => Unit = {
     val keyReader = newReader(SchemaBuilder.builder().stringType(), 
StringType, path, false)
-    val keyArray = new ArrayBuffer[Any]
-    val keyArrayUpdater = new ArrayBufferUpdater(keyArray)
-    val toKeyArrayConverter = getToArrayDataConverter(keyType, keyArray)
+    val keyArrayCreator = getArrayDataCreator(keyType)
+    val keyArrayExpander = getArrayDataExpander(keyType)
 
     val valueReader = newReader(avroType.getValueType, valueType, path, false)
-    val valueArray = new ArrayBuffer[Any]
-    val valueArrayUpdater = new ArrayBufferUpdater(valueArray)
-    val toValueArrayConverter = getToArrayDataConverter(valueType, valueArray)
+    val valueArrayCreator = getArrayDataCreator(valueType)
+    val valueArrayExpander = getArrayDataExpander(valueType)
 
     (updater, ordinal, in) => {
       var length = in.readMapStart()
-      keyArray.sizeHint(length.toInt)
-      valueArray.sizeHint(length.toInt)
+      var keyArray = keyArrayCreator(length) // if (length == 0) 0 else 1)
+      var valueArray = valueArrayCreator(length) // if (length == 0) 0 else 1)
+      val keyArrayUpdater = new ArrayDataUpdater(keyArray)
+      val valueArrayUpdater = new ArrayDataUpdater(valueArray)
 
+      var base: Int = 0
       while (length > 0) {
         var i = 0
         while (i < length) {
-          keyReader(keyArrayUpdater, i, in)
-          valueReader(valueArrayUpdater, i, in)
+          keyReader(keyArrayUpdater, base + i, in)
+          valueReader(valueArrayUpdater, base + i, in)
           i += 1
+          // keyArray = keyArrayExpander(keyArrayUpdater, if (i == length) 0 
else 1)
+          // valueArray = valueArrayExpander(valueArrayUpdater, if (i == 
length) 0 else 1)
         }
+        base += length.toInt
         length = in.mapNext()
-        keyArray.sizeHint((keyArray.length + length).toInt)
-        valueArray.sizeHint((valueArray.length + length).toInt)
+        keyArray = keyArrayExpander(keyArrayUpdater, length) // if (length == 
0) 0 else 1)
+        valueArray = valueArrayExpander(valueArrayUpdater, length) // if 
(length == 0) 0 else 1)
       }
 
       // The Avro map will never have null or duplicated map keys, it's safe 
to create a
       // ArrayBasedMapData directly here.
-      updater.set(ordinal, new ArrayBasedMapData(toKeyArrayConverter(), 
toValueArrayConverter()))
-      keyArray.clear()
-      valueArray.clear()
+      updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
     }
   }

Review comment:
       Changes to reading maps is the same as the ones to reading arrays 
https://github.com/apache/spark/pull/29354/commits/2c62ac9ae960582058e96860002f7768eebb95f2#r465994403
 (the url does not seem to work but it links to the comment in array reader 
method above)

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -420,10 +426,27 @@ class SparkAvroDatumReader[T](
         structReader(rowUpdater, in)
         updater.set(ordinal, row)
     } else {
-      (updater, ordinal, in) =>
-        val row = createRow()
-        structReader(new RowUpdater(row), in)
+      val rowPool = ArrayBuffer.empty[InternalRow]
+      val rowUpdaterPool = ArrayBuffer.empty[RowUpdater]
+      val i = reuseIndices.length
+      reuseIndices += 0
+

Review comment:
       These changes did not result in any improvements so were removed later 
on. Read times are in column F on pg 2 of the benchmark 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf) 
attached to this PR.

##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -809,51 +735,111 @@ class SparkAvroDatumReader[T](
     }
   }
 
-  /**
-   * A base interface for updating values inside catalyst data structure like 
`InternalRow` and
-   * `ArrayData`.
-   */
-  sealed trait CatalystDataUpdater {
-    def set(ordinal: Int, value: Any): Unit
-
-    def setNullAt(ordinal: Int): Unit = set(ordinal, null)
-    def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value)
-    def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value)
-    def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value)
-    def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value)
-    def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value)
-    def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value)
-    def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value)
-    def setDecimal(ordinal: Int, value: Decimal): Unit = set(ordinal, value)
-  }
-
-  final class RowUpdater(row: InternalRow) extends CatalystDataUpdater {
-    override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, 
value)
-
-    override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal)
-    override def setBoolean(ordinal: Int, value: Boolean): Unit = 
row.setBoolean(ordinal, value)
-    override def setByte(ordinal: Int, value: Byte): Unit = 
row.setByte(ordinal, value)
-    override def setShort(ordinal: Int, value: Short): Unit = 
row.setShort(ordinal, value)
-    override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, 
value)
-    override def setLong(ordinal: Int, value: Long): Unit = 
row.setLong(ordinal, value)
-    override def setDouble(ordinal: Int, value: Double): Unit = 
row.setDouble(ordinal, value)
-    override def setFloat(ordinal: Int, value: Float): Unit = 
row.setFloat(ordinal, value)
-    override def setDecimal(ordinal: Int, value: Decimal): Unit =
-      row.setDecimal(ordinal, value, value.precision)
-  }
-
-  final class ArrayDataUpdater(var array: ArrayData)
-      extends CatalystDataUpdater {
-    override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, 
value)
-
-    override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal)
-    override def setBoolean(ordinal: Int, value: Boolean): Unit = 
array.setBoolean(ordinal, value)
-    override def setByte(ordinal: Int, value: Byte): Unit = 
array.setByte(ordinal, value)
-    override def setShort(ordinal: Int, value: Short): Unit = 
array.setShort(ordinal, value)
-    override def setInt(ordinal: Int, value: Int): Unit = 
array.setInt(ordinal, value)
-    override def setLong(ordinal: Int, value: Long): Unit = 
array.setLong(ordinal, value)
-    override def setDouble(ordinal: Int, value: Double): Unit = 
array.setDouble(ordinal, value)
-    override def setFloat(ordinal: Int, value: Float): Unit = 
array.setFloat(ordinal, value)
-    override def setDecimal(ordinal: Int, value: Decimal): Unit = 
array.update(ordinal, value)
+  /** Function for skipping */
+
+  // Note: Currently GenericDatumReader.skip is not working as expected, eg 
reading an
+  // INT for an ENUM type and not reading NULL (the later has been fixed in
+  // 
https://github.com/apache/avro/commit/d3a2b149aa92608956fb0b163eb1bea06ef2c05a)
+  def makeSkip(schema: Schema): ResolvingDecoder => Unit = schema.getType 
match {
+    case RECORD =>
+      val fieldSkips = schema.getFields.asScala.map(field => 
makeSkip(field.schema))
+      in => fieldSkips.foreach(f => f(in))
+
+    case ARRAY =>
+      val elementSkip = makeSkip(schema.getElementType)
+      in => {
+        var l = in.skipArray()
+        while (l > 0) {
+          var i = 0
+          while (i < l) {
+            elementSkip(in)
+            i += 1
+          }
+          l = in.skipArray()
+        }
+      }
+
+    case MAP =>
+      val valueSkip = makeSkip(schema.getValueType)
+      in => {
+        var l = in.skipMap()
+        while (l > 0) {
+          var i = 0
+          while (i < l) {
+            in.skipString()
+            valueSkip(in)
+            i += 1
+          }
+          l = in.skipMap()
+        }
+      }
+
+    case UNION =>
+      val typeSkips = schema.getTypes.asScala.map(typ => makeSkip(typ))
+      in => typeSkips(in.readIndex)(in)
+
+    case FIXED =>
+      val fixedSize = schema.getFixedSize
+      in => in.skipFixed(fixedSize)
+
+    case ENUM => in => in.readEnum()
+    case STRING => in => in.skipString()
+    case BYTES => in => in.skipBytes()
+    case INT => in => in.readInt()
+    case LONG => in => in.readLong()
+    case FLOAT => in => in.readFloat()
+    case DOUBLE => in => in.readDouble()
+    case BOOLEAN => in => in.readBoolean()
+    case NULL => in => in.readNull()
+    case _ =>
+      throw new RuntimeException("Unknown type: " + schema)
   }
 }

Review comment:
       The skip function in GenericDatumReader misses a couple of cases. So 
wrote this instead. Benefits of skipping can be seen in the Single Column Scan 
cases of the Read benchmarks (pg 2 column K of the benchmark 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf)) as 
well as in the Pruning benchmark (pg 4 column K of the 
[pdf](https://github.com/apache/spark/files/5025167/AvroBenchmarks.pdf)).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to