This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 212ca86 [SPARK-31785][SQL][TESTS] Add a helper function to test all parquet readers 212ca86 is described below commit 212ca86b8c2e7671e4980ea6c2b869286a4dac06 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Fri May 22 09:53:35 2020 +0900 [SPARK-31785][SQL][TESTS] Add a helper function to test all parquet readers ### What changes were proposed in this pull request? Add `withAllParquetReaders` to `ParquetTest`. The function allow to run a block of code for all available Parquet readers. ### Why are the changes needed? 1. It simplifies tests 2. Allow to test all parquet readers that could be available in projects based on Apache Spark. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By running affected test suites. Closes #28598 from MaxGekk/add-withAllParquetReaders. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 60118a242639df060a9fdcaa4f14cd072ea3d056) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../datasources/parquet/ParquetFilterSuite.scala | 39 +++--- .../datasources/parquet/ParquetIOSuite.scala | 144 ++++++++++----------- .../parquet/ParquetInteroperabilitySuite.scala | 8 +- .../datasources/parquet/ParquetQuerySuite.scala | 30 ++--- .../datasources/parquet/ParquetTest.scala | 7 + 5 files changed, 106 insertions(+), 122 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5cf2129..7b33cef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -781,10 +781,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("Filter applied on merged Parquet schema with new column should work") { import testImplicits._ - Seq("true", "false").foreach { vectorized => + withAllParquetReaders { withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { withTempPath { dir => val path1 = s"${dir.getCanonicalPath}/table1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1) @@ -1219,24 +1218,22 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("SPARK-17213: Broken Parquet filter push-down for string columns") { - Seq(true, false).foreach { vectorizedEnabled => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedEnabled.toString) { - withTempPath { dir => - import testImplicits._ + withAllParquetReaders { + withTempPath { dir => + import testImplicits._ - val path = dir.getCanonicalPath - // scalastyle:off nonascii - Seq("a", "é").toDF("name").write.parquet(path) - // scalastyle:on nonascii + val path = dir.getCanonicalPath + // scalastyle:off nonascii + Seq("a", "é").toDF("name").write.parquet(path) + // scalastyle:on nonascii - assert(spark.read.parquet(path).where("name > 'a'").count() == 1) - assert(spark.read.parquet(path).where("name >= 'a'").count() == 2) + assert(spark.read.parquet(path).where("name > 'a'").count() == 1) + assert(spark.read.parquet(path).where("name >= 'a'").count() == 2) - // scalastyle:off nonascii - assert(spark.read.parquet(path).where("name < 'é'").count() == 1) - assert(spark.read.parquet(path).where("name <= 'é'").count() == 2) - // scalastyle:on nonascii - } + // scalastyle:off nonascii + assert(spark.read.parquet(path).where("name < 'é'").count() == 1) + assert(spark.read.parquet(path).where("name <= 'é'").count() == 2) + // scalastyle:on nonascii } } } @@ -1244,8 +1241,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") { import testImplicits._ - Seq(true, false).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString, + withAllParquetReaders { + withSQLConf( SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString, SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") { withTempPath { path => @@ -1255,7 +1252,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString, + withSQLConf( // Makes sure disabling 'spark.sql.parquet.recordFilter' still enables // row group level filtering. SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 87b4db3..f075d04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -647,47 +647,39 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("read dictionary encoded decimals written as INT32") { - ("true" :: "false" :: Nil).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { - checkAnswer( - // Decimal column in this file is encoded using plain dictionary - readResourceParquetFile("test-data/dec-in-i32.parquet"), - spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec)) - } + withAllParquetReaders { + checkAnswer( + // Decimal column in this file is encoded using plain dictionary + readResourceParquetFile("test-data/dec-in-i32.parquet"), + spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec)) } } test("read dictionary encoded decimals written as INT64") { - ("true" :: "false" :: Nil).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { - checkAnswer( - // Decimal column in this file is encoded using plain dictionary - readResourceParquetFile("test-data/dec-in-i64.parquet"), - spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) - } + withAllParquetReaders { + checkAnswer( + // Decimal column in this file is encoded using plain dictionary + readResourceParquetFile("test-data/dec-in-i64.parquet"), + spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) } } test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") { - ("true" :: "false" :: Nil).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { - checkAnswer( - // Decimal column in this file is encoded using plain dictionary - readResourceParquetFile("test-data/dec-in-fixed-len.parquet"), - spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) - } + withAllParquetReaders { + checkAnswer( + // Decimal column in this file is encoded using plain dictionary + readResourceParquetFile("test-data/dec-in-fixed-len.parquet"), + spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) } } test("read dictionary and plain encoded timestamp_millis written as INT64") { - ("true" :: "false" :: Nil).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { - checkAnswer( - // timestamp column in this file is encoded using combination of plain - // and dictionary encodings. - readResourceParquetFile("test-data/timemillis-in-i64.parquet"), - (1 to 3).map(i => Row(new java.sql.Timestamp(10)))) - } + withAllParquetReaders { + checkAnswer( + // timestamp column in this file is encoded using combination of plain + // and dictionary encodings. + readResourceParquetFile("test-data/timemillis-in-i64.parquet"), + (1 to 3).map(i => Row(new java.sql.Timestamp(10)))) } } @@ -943,23 +935,21 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - Seq(false, true).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") - checkReadMixedFiles( - "before_1582_timestamp_micros_v2_4.snappy.parquet", - "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456") - checkReadMixedFiles( - "before_1582_timestamp_millis_v2_4.snappy.parquet", - "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123") - - // INT96 is a legacy timestamp format and we always rebase the seconds for it. - checkAnswer(readResourceParquetFile( - "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) - } + withAllParquetReaders { + checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_timestamp_micros_v2_4.snappy.parquet", + "TIMESTAMP_MICROS", + "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_timestamp_millis_v2_4.snappy.parquet", + "TIMESTAMP_MILLIS", + "1001-01-01 01:02:03.123") + + // INT96 is a legacy timestamp format and we always rebase the seconds for it. + checkAnswer(readResourceParquetFile( + "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) } } @@ -984,27 +974,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .parquet(path) } - Seq(false, true).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - // The file metadata indicates if it needs rebase or not, so we can always get the - // correct result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf( - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) - } - } - - // Force to not rebase to prove the written datetime values are rebased - // and we will get wrong result if we don't rebase while reading. - withSQLConf("spark.test.forceNoRebase" -> "true") { + withAllParquetReaders { + // The file metadata indicates if it needs rebase or not, so we can always get the + // correct result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf( + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased)))) + Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) } } + + // Force to not rebase to prove the written datetime values are rebased + // and we will get wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { + checkAnswer( + spark.read.parquet(path), + Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased)))) + } } } } @@ -1027,26 +1015,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .parquet(path) } - Seq(false, true).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - // The file metadata indicates if it needs rebase or not, so we can always get the - // correct result regardless of the "rebase mode" config. - Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { - checkAnswer( - spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) - } - } - - // Force to not rebase to prove the written datetime values are rebased and we will get - // wrong result if we don't rebase while reading. - withSQLConf("spark.test.forceNoRebase" -> "true") { + withAllParquetReaders { + // The file metadata indicates if it needs rebase or not, so we can always get the + // correct result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.parquet(path), - Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07")))) + Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) } } + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { + checkAnswer( + spark.read.parquet(path), + Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07")))) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 7d75077..a14f641 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -124,12 +124,11 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS FileUtils.copyFile(new File(impalaPath), new File(tableDir, "part-00001.parq")) Seq(false, true).foreach { int96TimestampConversion => - Seq(false, true).foreach { vectorized => + withAllParquetReaders { withSQLConf( (SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, SQLConf.ParquetOutputTimestampType.INT96.toString), - (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()), - (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, int96TimestampConversion.toString()) ) { val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect() assert(readBack.size === 6) @@ -149,7 +148,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS val fullExpectations = (ts ++ impalaExpectations).map(_.toString).sorted.toArray val actual = readBack.map(_.getTimestamp(0).toString).sorted withClue( - s"int96TimestampConversion = $int96TimestampConversion; vectorized = $vectorized") { + s"int96TimestampConversion = $int96TimestampConversion; " + + s"vectorized = ${SQLConf.get.parquetVectorizedReaderEnabled}") { assert(fullExpectations === actual) // Now test that the behavior is still correct even with a filter which could get diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 917aaba..05d305a9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -168,11 +168,9 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS withTempPath { file => val df = spark.createDataFrame(sparkContext.parallelize(data), schema) df.write.parquet(file.getCanonicalPath) - ("true" :: "false" :: Nil).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { - val df2 = spark.read.parquet(file.getCanonicalPath) - checkAnswer(df2, df.collect().toSeq) - } + withAllParquetReaders { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) } } } @@ -791,15 +789,13 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } test("SPARK-26677: negated null-safe equality comparison should not filter matched row groups") { - (true :: false :: Nil).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - withTempPath { path => - // Repeated values for dictionary encoding. - Seq(Some("A"), Some("A"), None).toDF.repartition(1) - .write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) - checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) - } + withAllParquetReaders { + withTempPath { path => + // Repeated values for dictionary encoding. + Seq(Some("A"), Some("A"), None).toDF.repartition(1) + .write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df) } } } @@ -821,10 +817,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) { write(df2.write.mode(SaveMode.Append)) } - Seq("true", "false").foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { - checkAnswer(readback, df1.unionAll(df2)) - } + withAllParquetReaders { + checkAnswer(readback, df1.unionAll(df2)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index c833d5f..f572697 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -162,4 +162,11 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { protected def getResourceParquetFilePath(name: String): String = { Thread.currentThread().getContextClassLoader.getResource(name).toString } + + def withAllParquetReaders(code: => Unit): Unit = { + // test the row-based reader + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false")(code) + // test the vectorized reader + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org