This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 66d5a00  [SPARK-35817][SQL] Restore performance of queries against 
wide Avro tables
66d5a00 is described below

commit 66d5a0049a638cec7c70566ea880897651aa95f1
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Wed Jun 23 22:36:56 2021 +0800

    [SPARK-35817][SQL] Restore performance of queries against wide Avro tables
    
    ### What changes were proposed in this pull request?
    
    When creating a record writer in an AvroDeserializer, or creating a struct 
converter in an AvroSerializer, look up Avro fields using a map rather than 
scanning the entire list of Avro fields.
    
    ### Why are the changes needed?
    
    A query against an Avro table can be quite slow when all are true:
    
    * There are many columns in the Avro file
    * The query contains a wide projection
    * There are many splits in the input
    * Some of the splits are read serially (e.g., less executors than there are 
tasks)
    
    A write to an Avro table can be quite slow when all are true:
    
    * There are many columns in the new rows
    * The operation is creating many files
    
    For example, a single-threaded query against a 6000 column Avro data set 
with 50K rows and 20 files takes less than a minute with Spark 3.0.1 but over 7 
minutes with Spark 3.2.0-SNAPSHOT.
    
    This PR restores the faster time.
    
    For the 1000 column read benchmark:
    Before patch: 108447 ms
    After patch: 35925 ms
    percent improvement: 66%
    
    For the 1000 column write benchmark:
    Before patch: 123307
    After patch: 42313
    percent improvement: 65%
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    * Ran existing unit tests
    * Added new unit tests
    * Added new benchmarks
    
    Closes #32969 from bersprockets/SPARK-35817.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../avro/benchmarks/AvroReadBenchmark-results.txt  | 115 +++++++++++----------
 .../avro/benchmarks/AvroWriteBenchmark-results.txt |  20 ++--
 .../apache/spark/sql/avro/AvroDeserializer.scala   |   3 +-
 .../org/apache/spark/sql/avro/AvroSerializer.scala |   4 +-
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  47 +++++----
 .../spark/sql/avro/AvroSchemaHelperSuite.scala     |  67 ++++++++++++
 .../execution/benchmark/AvroReadBenchmark.scala    |  31 ++++++
 .../execution/benchmark/AvroWriteBenchmark.scala   |  32 ++++++
 8 files changed, 239 insertions(+), 80 deletions(-)

diff --git a/external/avro/benchmarks/AvroReadBenchmark-results.txt 
b/external/avro/benchmarks/AvroReadBenchmark-results.txt
index f77db2d..5483cf6 100644
--- a/external/avro/benchmarks/AvroReadBenchmark-results.txt
+++ b/external/avro/benchmarks/AvroReadBenchmark-results.txt
@@ -2,129 +2,140 @@
 SQL Single Numeric Column Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 SQL Single TINYINT Column Scan:           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum                                                2802           2826         
 34          5.6         178.1       1.0X
+Sum                                                2648           2658         
 15          5.9         168.3       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 SQL Single SMALLINT Column Scan:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum                                                2786           2810         
 35          5.6         177.1       1.0X
+Sum                                                2584           2624         
 56          6.1         164.3       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 SQL Single INT Column Scan:               Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum                                                2808           2817         
 13          5.6         178.5       1.0X
+Sum                                                2611           2612         
  2          6.0         166.0       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 SQL Single BIGINT Column Scan:            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum                                                3222           3224         
  3          4.9         204.9       1.0X
+Sum                                                2861           2866         
  7          5.5         181.9       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 SQL Single FLOAT Column Scan:             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum                                                2827           2844         
 24          5.6         179.7       1.0X
+Sum                                                2519           2528         
 13          6.2         160.1       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 SQL Single DOUBLE Column Scan:            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum                                                2910           2924         
 20          5.4         185.0       1.0X
+Sum                                                2584           2589         
  7          6.1         164.3       1.0X
 
 
 
================================================================================================
 Int and String Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Int and String Scan:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of columns                                     4575           4580         
  7          2.3         436.3       1.0X
+Sum of columns                                     4097           4098         
  1          2.6         390.7       1.0X
 
 
 
================================================================================================
 Partitioned Table Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Partitioned Table:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Data column                                        3252           3271         
 27          4.8         206.8       1.0X
-Partition column                                   2905           2907         
  3          5.4         184.7       1.1X
-Both columns                                       3385           3398         
 18          4.6         215.2       1.0X
+Data column                                        2918           2920         
  3          5.4         185.5       1.0X
+Partition column                                   2603           2605         
  2          6.0         165.5       1.1X
+Both columns                                       2949           2953         
  5          5.3         187.5       1.0X
 
 
 
================================================================================================
 Repeated String Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Repeated String:                          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of string length                               3275           3278         
  3          3.2         312.4       1.0X
+Sum of string length                               2759           2763         
  6          3.8         263.1       1.0X
 
 
 
================================================================================================
 String with Nulls Scan
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 String with Nulls Scan (0.0%):            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of string length                               5202           5219         
 24          2.0         496.1       1.0X
+Sum of string length                               4444           4449         
  7          2.4         423.8       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 String with Nulls Scan (50.0%):           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of string length                               3360           3381         
 29          3.1         320.5       1.0X
+Sum of string length                               2892           2894         
  3          3.6         275.8       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 String with Nulls Scan (95.0%):           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of string length                               1917           1936         
 28          5.5         182.8       1.0X
+Sum of string length                               1693           1696         
  5          6.2         161.4       1.0X
+
+
+================================================================================================
+Select All From Wide Columns
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
+Wide Column Scan from 1000 columns:       Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Select of all columns                             35653          35925         
384          0.0       71306.7       1.0X
 
 
 
================================================================================================
 Single Column Scan From Wide Columns
 
================================================================================================
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Single Column Scan from 100 columns:      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of single column                               4348           4424         
107          0.2        4146.5       1.0X
+Sum of single column                               4102           4103         
  2          0.3        3911.6       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Single Column Scan from 200 columns:      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of single column                               8799           8806         
 10          0.1        8391.2       1.0X
+Sum of single column                               8014           8074         
 85          0.1        7642.4       1.0X
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Single Column Scan from 300 columns:      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Sum of single column                              12956          12990         
 49          0.1       12355.5       1.0X
+Sum of single column                              11980          11990         
 14          0.1       11425.5       1.0X
 
 
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Filters pushdown:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-w/o filters                                        9208           9269         
 63          0.1        9207.5       1.0X
-pushdown disabled                                  9073           9111         
 59          0.1        9072.7       1.0X
-w/ filters                                         3929           3947         
 18          0.3        3928.8       2.3X
+w/o filters                                        9014           9033         
 23          0.1        9014.2       1.0X
+pushdown disabled                                  8878           8900         
 23          0.1        8877.8       1.0X
+w/ filters                                         3700           3707         
  9          0.3        3699.7       2.4X
 
diff --git a/external/avro/benchmarks/AvroWriteBenchmark-results.txt 
b/external/avro/benchmarks/AvroWriteBenchmark-results.txt
index 26bb126..0f7a862 100644
--- a/external/avro/benchmarks/AvroWriteBenchmark-results.txt
+++ b/external/avro/benchmarks/AvroWriteBenchmark-results.txt
@@ -1,10 +1,16 @@
-OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
 Avro writer benchmark:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Output Single Int Column                           2478           2537         
 83          6.3         157.6       1.0X
-Output Single Double Column                        2636           2652         
 21          6.0         167.6       0.9X
-Output Int and String Column                       5922           6039         
166          2.7         376.5       0.4X
-Output Partitions                                  4158           4305         
207          3.8         264.3       0.6X
-Output Buckets                                     5486           5534         
 68          2.9         348.8       0.5X
+Output Single Int Column                           2767           2813         
 65          5.7         175.9       1.0X
+Output Single Double Column                        2973           2975         
  2          5.3         189.0       0.9X
+Output Int and String Column                       6024           6036         
 16          2.6         383.0       0.5X
+Output Partitions                                  4610           4709         
140          3.4         293.1       0.6X
+Output Buckets                                     6177           6209         
 45          2.5         392.7       0.4X
+
+OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64
+Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
+Write wide rows into 20 files:            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Write wide rows                                   40838          40936         
139          0.0       81675.4       1.0X
 
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index a19a7b0..6699233 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -338,11 +338,12 @@ private[sql] class AvroDeserializer(
     val validFieldIndexes = ArrayBuffer.empty[Int]
     val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit]
 
+    val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroType, avroPath)
     val length = catalystType.length
     var i = 0
     while (i < length) {
       val catalystField = catalystType.fields(i)
-      AvroUtils.getAvroFieldByName(avroType, catalystField.name, avroPath) 
match {
+      avroSchemaHelper.getFieldByName(catalystField.name) match {
         case Some(avroField) =>
           validFieldIndexes += avroField.pos()
 
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 8665757..710c191 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -250,11 +250,11 @@ private[sql] class AvroSerializer(
         s"Avro $avroPathStr schema length (${avroFields.size}) doesn't match " 
+
         s"SQL ${toFieldStr(catalystPath)} schema length 
(${catalystStruct.length})")
     }
+    val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroStruct, avroPath)
 
     val (avroIndices: Array[Int], fieldConverters: Array[Converter]) =
       catalystStruct.map { catalystField =>
-        val avroField = AvroUtils
-            .getAvroFieldByName(avroStruct, catalystField.name, avroPath) 
match {
+        val avroField = avroSchemaHelper.getFieldByName(catalystField.name) 
match {
           case Some(f) => f
           case None => throw new IncompatibleSchemaException(s"Cannot find " +
               s"${toFieldStr(catalystPath :+ catalystField.name)} in Avro 
schema at $avroPathStr")
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 74f4d0e..9dbb3b9 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.avro
 
 import java.io.{FileNotFoundException, IOException}
+import java.util.Locale
 
 import scala.collection.JavaConverters._
 
@@ -203,33 +204,43 @@ private[sql] object AvroUtils extends Logging {
   }
 
   /**
-   * Extract a single field from `avroSchema` which has the desired field name,
-   * performing the matching with proper case sensitivity according to 
[[SQLConf.resolver]].
+   * Wraps an Avro Schema object so that field lookups are faster.
    *
-   * @param avroSchema The schema in which to search for the field. Must be of 
type RECORD.
-   * @param name The name of the field to search for.
+   * @param avroSchema The schema in which to search for fields. Must be of 
type RECORD.
    * @param avroPath The seq of parent field names leading to `avroSchema`.
-   * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
-   * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or 
contains multiple
-   *                                     fields matching `name` (i.e., 
case-insensitive matching
-   *                                     is used and `avroSchema` has two or 
more fields that have
-   *                                     the same name with difference case).
    */
-  private[avro] def getAvroFieldByName(
-      avroSchema: Schema,
-      name: String,
-      avroPath: Seq[String]): Option[Schema.Field] = {
+  class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
     if (avroSchema.getType != Schema.Type.RECORD) {
       throw new IncompatibleSchemaException(
         s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: 
${avroSchema.getType}")
     }
-    avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(), 
name)).toSeq match {
-      case Seq(avroField) => Some(avroField)
-      case Seq() => None
-      case matches => throw new IncompatibleSchemaException(s"Searching for 
'$name' in Avro " +
+
+    private[this] val fieldMap = avroSchema.getFields.asScala
+      .groupBy(_.name.toLowerCase(Locale.ROOT))
+      .mapValues(_.toSeq) // toSeq needed for scala 2.13
+
+    /**
+     * Extract a single field from the contained avro schema which has the 
desired field name,
+     * performing the matching with proper case sensitivity according to 
SQLConf.resolver.
+     *
+     * @param name The name of the field to search for.
+     * @return `Some(match)` if a matching Avro field is found, otherwise 
`None`.
+     */
+    def getFieldByName(name: String): Option[Schema.Field] = {
+
+      // get candidates, ignoring case of field name
+      val candidates = fieldMap.get(name.toLowerCase(Locale.ROOT))
+        .getOrElse(Seq.empty[Schema.Field])
+
+      // search candidates, taking into account case sensitivity settings
+      candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match {
+        case Seq(avroField) => Some(avroField)
+        case Seq() => None
+        case matches => throw new IncompatibleSchemaException(s"Searching for 
'$name' in Avro " +
           s"schema at ${toFieldStr(avroPath)} gave ${matches.size} matches. 
Candidates: " +
           matches.map(_.name()).mkString("[", ", ", "]")
-      )
+        )
+      }
     }
   }
 
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSchemaHelperSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSchemaHelperSuite.scala
new file mode 100644
index 0000000..9723fed
--- /dev/null
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSchemaHelperSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import org.apache.avro.SchemaBuilder
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class AvroSchemaHelperSuite extends SQLTestUtils with SharedSparkSession {
+
+  test("ensure schema is a record") {
+    val avroSchema = SchemaBuilder.builder().intType()
+
+    val msg = intercept[IncompatibleSchemaException] {
+      new AvroUtils.AvroSchemaHelper(avroSchema, Seq(""))
+    }.getMessage
+    assert(msg.contains("Attempting to treat int as a RECORD"))
+  }
+
+  test("handle mixed case field names") {
+    val catalystSchema = StructType(
+      StructField("a", IntegerType) ::
+      StructField("b", IntegerType) ::
+      StructField("A", IntegerType) ::
+      Nil
+    )
+
+    val avroSchema = SchemaConverters.toAvroType(catalystSchema)
+    val helper = new AvroUtils.AvroSchemaHelper(avroSchema, Seq(""))
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      assert(helper.getFieldByName("A").get.name() == "A")
+      assert(helper.getFieldByName("a").get.name() == "a")
+      assert(helper.getFieldByName("b").get.name() == "b")
+      assert(helper.getFieldByName("B").isEmpty)
+    }
+
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      Seq("a", "A").foreach { fieldName =>
+        withClue(s"looking for field name: $fieldName") {
+          val msg = intercept[IncompatibleSchemaException] {
+            helper.getFieldByName(fieldName)
+          }.getMessage
+          assert(msg.contains(s"Searching for '$fieldName' in Avro schema"))
+        }
+      }
+
+      assert(helper.getFieldByName("b").get.name() == "b")
+      assert(helper.getFieldByName("B").get.name() == "b")
+    }
+  }
+}
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala
index 01a78dc..7368543 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala
@@ -191,6 +191,32 @@ object AvroReadBenchmark extends SqlBasedBenchmark {
     }
   }
 
+  private def wideColumnsBenchmark(values: Int, width: Int, files: Int): Unit 
= {
+    val benchmark =
+      new Benchmark(s"Wide Column Scan from $width columns", values, output = 
output)
+
+    withTempPath { dir =>
+      withTempTable("t1", "avroTable") {
+        import spark.implicits._
+        val middle = width / 2
+        val selectExpr = (1 to width).map(i => s"value as c$i")
+        spark.range(values).map(_ => Random.nextLong).toDF()
+          .selectExpr(selectExpr: _*)
+          .repartition(files) // ensure at least `files` number of splits (but 
maybe more)
+          .createOrReplaceTempView("t1")
+
+        prepareTable(dir, spark.sql("SELECT * FROM t1"))
+
+        benchmark.addCase("Select of all columns") { _ =>
+          spark.sql(s"SELECT * FROM avroTable").noop()
+        }
+
+        benchmark.run()
+      }
+    }
+
+  }
+
   private def filtersPushdownBenchmark(rowsNum: Int, numIters: Int): Unit = {
     val benchmark = new Benchmark("Filters pushdown", rowsNum, output = output)
     val colsNum = 100
@@ -265,6 +291,11 @@ object AvroReadBenchmark extends SqlBasedBenchmark {
         stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls)
       }
     }
+
+    runBenchmark("Select All From Wide Columns") {
+      wideColumnsBenchmark(500000, 1000, 20)
+    }
+
     runBenchmark("Single Column Scan From Wide Columns") {
       columnsBenchmark(1024 * 1024 * 1, 100)
       columnsBenchmark(1024 * 1024 * 1, 200)
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
index 0b11434..7f9febb 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.execution.benchmark
 
+import scala.util.Random
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.storage.StorageLevel
+
 /**
  * Benchmark to measure Avro data sources write performance.
  * To run this benchmark:
@@ -31,7 +36,34 @@ package org.apache.spark.sql.execution.benchmark
  *  }}}
  */
 object AvroWriteBenchmark extends DataSourceWriteBenchmark {
+  private def wideColumnsBenchmark: Unit = {
+    import spark.implicits._
+
+    withTempPath { dir =>
+      withTempTable("t1") {
+        val width = 1000
+        val values = 500000
+        val files = 20
+        val selectExpr = (1 to width).map(i => s"value as c$i")
+        // repartition to ensure we will write multiple files
+        val df = spark.range(values)
+          .map(_ => Random.nextInt).selectExpr(selectExpr: 
_*).repartition(files)
+          .persist(StorageLevel.DISK_ONLY)
+        // cache the data to ensure we are not benchmarking range or 
repartition
+        df.noop()
+        df.createOrReplaceTempView("t1")
+        val benchmark = new Benchmark(s"Write wide rows into $files files", 
values, output = output)
+        benchmark.addCase("Write wide rows") { _ =>
+          spark.sql("SELECT * FROM t1").
+            
write.format("avro").save(s"${dir.getCanonicalPath}/${Random.nextLong.abs}")
+        }
+        benchmark.run()
+      }
+    }
+  }
+
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
     runDataSourceBenchmark("Avro")
+    wideColumnsBenchmark
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to