msamirkhan commented on a change in pull request #29354:
URL: https://github.com/apache/spark/pull/29354#discussion_r465998634



##########
File path: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SparkAvroDatumReader.scala
##########
@@ -809,51 +735,111 @@ class SparkAvroDatumReader[T](
     }
   }
 
-  /**
-   * A base interface for updating values inside catalyst data structure like 
`InternalRow` and
-   * `ArrayData`.
-   */
-  sealed trait CatalystDataUpdater {
-    def set(ordinal: Int, value: Any): Unit
-
-    def setNullAt(ordinal: Int): Unit = set(ordinal, null)
-    def setBoolean(ordinal: Int, value: Boolean): Unit = set(ordinal, value)
-    def setByte(ordinal: Int, value: Byte): Unit = set(ordinal, value)
-    def setShort(ordinal: Int, value: Short): Unit = set(ordinal, value)
-    def setInt(ordinal: Int, value: Int): Unit = set(ordinal, value)
-    def setLong(ordinal: Int, value: Long): Unit = set(ordinal, value)
-    def setDouble(ordinal: Int, value: Double): Unit = set(ordinal, value)
-    def setFloat(ordinal: Int, value: Float): Unit = set(ordinal, value)
-    def setDecimal(ordinal: Int, value: Decimal): Unit = set(ordinal, value)
-  }
-
-  final class RowUpdater(row: InternalRow) extends CatalystDataUpdater {
-    override def set(ordinal: Int, value: Any): Unit = row.update(ordinal, 
value)
-
-    override def setNullAt(ordinal: Int): Unit = row.setNullAt(ordinal)
-    override def setBoolean(ordinal: Int, value: Boolean): Unit = 
row.setBoolean(ordinal, value)
-    override def setByte(ordinal: Int, value: Byte): Unit = 
row.setByte(ordinal, value)
-    override def setShort(ordinal: Int, value: Short): Unit = 
row.setShort(ordinal, value)
-    override def setInt(ordinal: Int, value: Int): Unit = row.setInt(ordinal, 
value)
-    override def setLong(ordinal: Int, value: Long): Unit = 
row.setLong(ordinal, value)
-    override def setDouble(ordinal: Int, value: Double): Unit = 
row.setDouble(ordinal, value)
-    override def setFloat(ordinal: Int, value: Float): Unit = 
row.setFloat(ordinal, value)
-    override def setDecimal(ordinal: Int, value: Decimal): Unit =
-      row.setDecimal(ordinal, value, value.precision)
-  }
-
-  final class ArrayDataUpdater(var array: ArrayData)
-      extends CatalystDataUpdater {
-    override def set(ordinal: Int, value: Any): Unit = array.update(ordinal, 
value)
-
-    override def setNullAt(ordinal: Int): Unit = array.setNullAt(ordinal)
-    override def setBoolean(ordinal: Int, value: Boolean): Unit = 
array.setBoolean(ordinal, value)
-    override def setByte(ordinal: Int, value: Byte): Unit = 
array.setByte(ordinal, value)
-    override def setShort(ordinal: Int, value: Short): Unit = 
array.setShort(ordinal, value)
-    override def setInt(ordinal: Int, value: Int): Unit = 
array.setInt(ordinal, value)
-    override def setLong(ordinal: Int, value: Long): Unit = 
array.setLong(ordinal, value)
-    override def setDouble(ordinal: Int, value: Double): Unit = 
array.setDouble(ordinal, value)
-    override def setFloat(ordinal: Int, value: Float): Unit = 
array.setFloat(ordinal, value)
-    override def setDecimal(ordinal: Int, value: Decimal): Unit = 
array.update(ordinal, value)
+  /** Function for skipping */
+
+  // Note: Currently GenericDatumReader.skip is not working as expected, eg 
reading an
+  // INT for an ENUM type and not reading NULL (the later has been fixed in
+  // 
https://github.com/apache/avro/commit/d3a2b149aa92608956fb0b163eb1bea06ef2c05a)
+  def makeSkip(schema: Schema): ResolvingDecoder => Unit = schema.getType 
match {
+    case RECORD =>
+      val fieldSkips = schema.getFields.asScala.map(field => 
makeSkip(field.schema))
+      in => fieldSkips.foreach(f => f(in))
+
+    case ARRAY =>
+      val elementSkip = makeSkip(schema.getElementType)
+      in => {
+        var l = in.skipArray()
+        while (l > 0) {
+          var i = 0
+          while (i < l) {
+            elementSkip(in)
+            i += 1
+          }
+          l = in.skipArray()
+        }
+      }
+
+    case MAP =>
+      val valueSkip = makeSkip(schema.getValueType)
+      in => {
+        var l = in.skipMap()
+        while (l > 0) {
+          var i = 0
+          while (i < l) {
+            in.skipString()
+            valueSkip(in)
+            i += 1
+          }
+          l = in.skipMap()
+        }
+      }
+
+    case UNION =>
+      val typeSkips = schema.getTypes.asScala.map(typ => makeSkip(typ))
+      in => typeSkips(in.readIndex)(in)
+
+    case FIXED =>
+      val fixedSize = schema.getFixedSize
+      in => in.skipFixed(fixedSize)
+
+    case ENUM => in => in.readEnum()
+    case STRING => in => in.skipString()
+    case BYTES => in => in.skipBytes()
+    case INT => in => in.readInt()
+    case LONG => in => in.readLong()
+    case FLOAT => in => in.readFloat()
+    case DOUBLE => in => in.readDouble()
+    case BOOLEAN => in => in.readBoolean()
+    case NULL => in => in.readNull()
+    case _ =>
+      throw new RuntimeException("Unknown type: " + schema)
   }
 }

Review comment:
       Did not have the filters pushdown cases in the AvroReadBenchmark in 
branch-3.0, but running on master I did see some improvements there as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to