This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 66d5a00 [SPARK-35817][SQL] Restore performance of queries against wide Avro tables 66d5a00 is described below commit 66d5a0049a638cec7c70566ea880897651aa95f1 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Wed Jun 23 22:36:56 2021 +0800 [SPARK-35817][SQL] Restore performance of queries against wide Avro tables ### What changes were proposed in this pull request? When creating a record writer in an AvroDeserializer, or creating a struct converter in an AvroSerializer, look up Avro fields using a map rather than scanning the entire list of Avro fields. ### Why are the changes needed? A query against an Avro table can be quite slow when all are true: * There are many columns in the Avro file * The query contains a wide projection * There are many splits in the input * Some of the splits are read serially (e.g., less executors than there are tasks) A write to an Avro table can be quite slow when all are true: * There are many columns in the new rows * The operation is creating many files For example, a single-threaded query against a 6000 column Avro data set with 50K rows and 20 files takes less than a minute with Spark 3.0.1 but over 7 minutes with Spark 3.2.0-SNAPSHOT. This PR restores the faster time. For the 1000 column read benchmark: Before patch: 108447 ms After patch: 35925 ms percent improvement: 66% For the 1000 column write benchmark: Before patch: 123307 After patch: 42313 percent improvement: 65% ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? * Ran existing unit tests * Added new unit tests * Added new benchmarks Closes #32969 from bersprockets/SPARK-35817. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../avro/benchmarks/AvroReadBenchmark-results.txt | 115 +++++++++++---------- .../avro/benchmarks/AvroWriteBenchmark-results.txt | 20 ++-- .../apache/spark/sql/avro/AvroDeserializer.scala | 3 +- .../org/apache/spark/sql/avro/AvroSerializer.scala | 4 +- .../org/apache/spark/sql/avro/AvroUtils.scala | 47 +++++---- .../spark/sql/avro/AvroSchemaHelperSuite.scala | 67 ++++++++++++ .../execution/benchmark/AvroReadBenchmark.scala | 31 ++++++ .../execution/benchmark/AvroWriteBenchmark.scala | 32 ++++++ 8 files changed, 239 insertions(+), 80 deletions(-) diff --git a/external/avro/benchmarks/AvroReadBenchmark-results.txt b/external/avro/benchmarks/AvroReadBenchmark-results.txt index f77db2d..5483cf6 100644 --- a/external/avro/benchmarks/AvroReadBenchmark-results.txt +++ b/external/avro/benchmarks/AvroReadBenchmark-results.txt @@ -2,129 +2,140 @@ SQL Single Numeric Column Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz SQL Single TINYINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2802 2826 34 5.6 178.1 1.0X +Sum 2648 2658 15 5.9 168.3 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz SQL Single SMALLINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2786 2810 35 5.6 177.1 1.0X +Sum 2584 2624 56 6.1 164.3 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz SQL Single INT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2808 2817 13 5.6 178.5 1.0X +Sum 2611 2612 2 6.0 166.0 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz SQL Single BIGINT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 3222 3224 3 4.9 204.9 1.0X +Sum 2861 2866 7 5.5 181.9 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz SQL Single FLOAT Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2827 2844 24 5.6 179.7 1.0X +Sum 2519 2528 13 6.2 160.1 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz SQL Single DOUBLE Column Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum 2910 2924 20 5.4 185.0 1.0X +Sum 2584 2589 7 6.1 164.3 1.0X ================================================================================================ Int and String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Int and String Scan: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of columns 4575 4580 7 2.3 436.3 1.0X +Sum of columns 4097 4098 1 2.6 390.7 1.0X ================================================================================================ Partitioned Table Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Partitioned Table: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Data column 3252 3271 27 4.8 206.8 1.0X -Partition column 2905 2907 3 5.4 184.7 1.1X -Both columns 3385 3398 18 4.6 215.2 1.0X +Data column 2918 2920 3 5.4 185.5 1.0X +Partition column 2603 2605 2 6.0 165.5 1.1X +Both columns 2949 2953 5 5.3 187.5 1.0X ================================================================================================ Repeated String Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Repeated String: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3275 3278 3 3.2 312.4 1.0X +Sum of string length 2759 2763 6 3.8 263.1 1.0X ================================================================================================ String with Nulls Scan ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz String with Nulls Scan (0.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 5202 5219 24 2.0 496.1 1.0X +Sum of string length 4444 4449 7 2.4 423.8 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz String with Nulls Scan (50.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 3360 3381 29 3.1 320.5 1.0X +Sum of string length 2892 2894 3 3.6 275.8 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz String with Nulls Scan (95.0%): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of string length 1917 1936 28 5.5 182.8 1.0X +Sum of string length 1693 1696 5 6.2 161.4 1.0X + + +================================================================================================ +Select All From Wide Columns +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz +Wide Column Scan from 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Select of all columns 35653 35925 384 0.0 71306.7 1.0X ================================================================================================ Single Column Scan From Wide Columns ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Single Column Scan from 100 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 4348 4424 107 0.2 4146.5 1.0X +Sum of single column 4102 4103 2 0.3 3911.6 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Single Column Scan from 200 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 8799 8806 10 0.1 8391.2 1.0X +Sum of single column 8014 8074 85 0.1 7642.4 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Single Column Scan from 300 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Sum of single column 12956 12990 49 0.1 12355.5 1.0X +Sum of single column 11980 11990 14 0.1 11425.5 1.0X -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 9208 9269 63 0.1 9207.5 1.0X -pushdown disabled 9073 9111 59 0.1 9072.7 1.0X -w/ filters 3929 3947 18 0.3 3928.8 2.3X +w/o filters 9014 9033 23 0.1 9014.2 1.0X +pushdown disabled 8878 8900 23 0.1 8877.8 1.0X +w/ filters 3700 3707 9 0.3 3699.7 2.4X diff --git a/external/avro/benchmarks/AvroWriteBenchmark-results.txt b/external/avro/benchmarks/AvroWriteBenchmark-results.txt index 26bb126..0f7a862 100644 --- a/external/avro/benchmarks/AvroWriteBenchmark-results.txt +++ b/external/avro/benchmarks/AvroWriteBenchmark-results.txt @@ -1,10 +1,16 @@ -OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz Avro writer benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Output Single Int Column 2478 2537 83 6.3 157.6 1.0X -Output Single Double Column 2636 2652 21 6.0 167.6 0.9X -Output Int and String Column 5922 6039 166 2.7 376.5 0.4X -Output Partitions 4158 4305 207 3.8 264.3 0.6X -Output Buckets 5486 5534 68 2.9 348.8 0.5X +Output Single Int Column 2767 2813 65 5.7 175.9 1.0X +Output Single Double Column 2973 2975 2 5.3 189.0 0.9X +Output Int and String Column 6024 6036 16 2.6 383.0 0.5X +Output Partitions 4610 4709 140 3.4 293.1 0.6X +Output Buckets 6177 6209 45 2.5 392.7 0.4X + +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Linux 4.18.0-193.6.3.el8_2.x86_64 +Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz +Write wide rows into 20 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Write wide rows 40838 40936 139 0.0 81675.4 1.0X diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index a19a7b0..6699233 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -338,11 +338,12 @@ private[sql] class AvroDeserializer( val validFieldIndexes = ArrayBuffer.empty[Int] val fieldWriters = ArrayBuffer.empty[(CatalystDataUpdater, Any) => Unit] + val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroType, avroPath) val length = catalystType.length var i = 0 while (i < length) { val catalystField = catalystType.fields(i) - AvroUtils.getAvroFieldByName(avroType, catalystField.name, avroPath) match { + avroSchemaHelper.getFieldByName(catalystField.name) match { case Some(avroField) => validFieldIndexes += avroField.pos() diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 8665757..710c191 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -250,11 +250,11 @@ private[sql] class AvroSerializer( s"Avro $avroPathStr schema length (${avroFields.size}) doesn't match " + s"SQL ${toFieldStr(catalystPath)} schema length (${catalystStruct.length})") } + val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroStruct, avroPath) val (avroIndices: Array[Int], fieldConverters: Array[Converter]) = catalystStruct.map { catalystField => - val avroField = AvroUtils - .getAvroFieldByName(avroStruct, catalystField.name, avroPath) match { + val avroField = avroSchemaHelper.getFieldByName(catalystField.name) match { case Some(f) => f case None => throw new IncompatibleSchemaException(s"Cannot find " + s"${toFieldStr(catalystPath :+ catalystField.name)} in Avro schema at $avroPathStr") diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 74f4d0e..9dbb3b9 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.avro import java.io.{FileNotFoundException, IOException} +import java.util.Locale import scala.collection.JavaConverters._ @@ -203,33 +204,43 @@ private[sql] object AvroUtils extends Logging { } /** - * Extract a single field from `avroSchema` which has the desired field name, - * performing the matching with proper case sensitivity according to [[SQLConf.resolver]]. + * Wraps an Avro Schema object so that field lookups are faster. * - * @param avroSchema The schema in which to search for the field. Must be of type RECORD. - * @param name The name of the field to search for. + * @param avroSchema The schema in which to search for fields. Must be of type RECORD. * @param avroPath The seq of parent field names leading to `avroSchema`. - * @return `Some(match)` if a matching Avro field is found, otherwise `None`. - * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or contains multiple - * fields matching `name` (i.e., case-insensitive matching - * is used and `avroSchema` has two or more fields that have - * the same name with difference case). */ - private[avro] def getAvroFieldByName( - avroSchema: Schema, - name: String, - avroPath: Seq[String]): Option[Schema.Field] = { + class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) { if (avroSchema.getType != Schema.Type.RECORD) { throw new IncompatibleSchemaException( s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: ${avroSchema.getType}") } - avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(), name)).toSeq match { - case Seq(avroField) => Some(avroField) - case Seq() => None - case matches => throw new IncompatibleSchemaException(s"Searching for '$name' in Avro " + + + private[this] val fieldMap = avroSchema.getFields.asScala + .groupBy(_.name.toLowerCase(Locale.ROOT)) + .mapValues(_.toSeq) // toSeq needed for scala 2.13 + + /** + * Extract a single field from the contained avro schema which has the desired field name, + * performing the matching with proper case sensitivity according to SQLConf.resolver. + * + * @param name The name of the field to search for. + * @return `Some(match)` if a matching Avro field is found, otherwise `None`. + */ + def getFieldByName(name: String): Option[Schema.Field] = { + + // get candidates, ignoring case of field name + val candidates = fieldMap.get(name.toLowerCase(Locale.ROOT)) + .getOrElse(Seq.empty[Schema.Field]) + + // search candidates, taking into account case sensitivity settings + candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match { + case Seq(avroField) => Some(avroField) + case Seq() => None + case matches => throw new IncompatibleSchemaException(s"Searching for '$name' in Avro " + s"schema at ${toFieldStr(avroPath)} gave ${matches.size} matches. Candidates: " + matches.map(_.name()).mkString("[", ", ", "]") - ) + ) + } } } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSchemaHelperSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSchemaHelperSuite.scala new file mode 100644 index 0000000..9723fed --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSchemaHelperSuite.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import org.apache.avro.SchemaBuilder + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class AvroSchemaHelperSuite extends SQLTestUtils with SharedSparkSession { + + test("ensure schema is a record") { + val avroSchema = SchemaBuilder.builder().intType() + + val msg = intercept[IncompatibleSchemaException] { + new AvroUtils.AvroSchemaHelper(avroSchema, Seq("")) + }.getMessage + assert(msg.contains("Attempting to treat int as a RECORD")) + } + + test("handle mixed case field names") { + val catalystSchema = StructType( + StructField("a", IntegerType) :: + StructField("b", IntegerType) :: + StructField("A", IntegerType) :: + Nil + ) + + val avroSchema = SchemaConverters.toAvroType(catalystSchema) + val helper = new AvroUtils.AvroSchemaHelper(avroSchema, Seq("")) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + assert(helper.getFieldByName("A").get.name() == "A") + assert(helper.getFieldByName("a").get.name() == "a") + assert(helper.getFieldByName("b").get.name() == "b") + assert(helper.getFieldByName("B").isEmpty) + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + Seq("a", "A").foreach { fieldName => + withClue(s"looking for field name: $fieldName") { + val msg = intercept[IncompatibleSchemaException] { + helper.getFieldByName(fieldName) + }.getMessage + assert(msg.contains(s"Searching for '$fieldName' in Avro schema")) + } + } + + assert(helper.getFieldByName("b").get.name() == "b") + assert(helper.getFieldByName("B").get.name() == "b") + } + } +} diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala index 01a78dc..7368543 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroReadBenchmark.scala @@ -191,6 +191,32 @@ object AvroReadBenchmark extends SqlBasedBenchmark { } } + private def wideColumnsBenchmark(values: Int, width: Int, files: Int): Unit = { + val benchmark = + new Benchmark(s"Wide Column Scan from $width columns", values, output = output) + + withTempPath { dir => + withTempTable("t1", "avroTable") { + import spark.implicits._ + val middle = width / 2 + val selectExpr = (1 to width).map(i => s"value as c$i") + spark.range(values).map(_ => Random.nextLong).toDF() + .selectExpr(selectExpr: _*) + .repartition(files) // ensure at least `files` number of splits (but maybe more) + .createOrReplaceTempView("t1") + + prepareTable(dir, spark.sql("SELECT * FROM t1")) + + benchmark.addCase("Select of all columns") { _ => + spark.sql(s"SELECT * FROM avroTable").noop() + } + + benchmark.run() + } + } + + } + private def filtersPushdownBenchmark(rowsNum: Int, numIters: Int): Unit = { val benchmark = new Benchmark("Filters pushdown", rowsNum, output = output) val colsNum = 100 @@ -265,6 +291,11 @@ object AvroReadBenchmark extends SqlBasedBenchmark { stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls) } } + + runBenchmark("Select All From Wide Columns") { + wideColumnsBenchmark(500000, 1000, 20) + } + runBenchmark("Single Column Scan From Wide Columns") { columnsBenchmark(1024 * 1024 * 1, 100) columnsBenchmark(1024 * 1024 * 1, 200) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala index 0b11434..7f9febb 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.execution.benchmark +import scala.util.Random + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.storage.StorageLevel + /** * Benchmark to measure Avro data sources write performance. * To run this benchmark: @@ -31,7 +36,34 @@ package org.apache.spark.sql.execution.benchmark * }}} */ object AvroWriteBenchmark extends DataSourceWriteBenchmark { + private def wideColumnsBenchmark: Unit = { + import spark.implicits._ + + withTempPath { dir => + withTempTable("t1") { + val width = 1000 + val values = 500000 + val files = 20 + val selectExpr = (1 to width).map(i => s"value as c$i") + // repartition to ensure we will write multiple files + val df = spark.range(values) + .map(_ => Random.nextInt).selectExpr(selectExpr: _*).repartition(files) + .persist(StorageLevel.DISK_ONLY) + // cache the data to ensure we are not benchmarking range or repartition + df.noop() + df.createOrReplaceTempView("t1") + val benchmark = new Benchmark(s"Write wide rows into $files files", values, output = output) + benchmark.addCase("Write wide rows") { _ => + spark.sql("SELECT * FROM t1"). + write.format("avro").save(s"${dir.getCanonicalPath}/${Random.nextLong.abs}") + } + benchmark.run() + } + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { runDataSourceBenchmark("Avro") + wideColumnsBenchmark } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org