This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 212ca86  [SPARK-31785][SQL][TESTS] Add a helper function to test all 
parquet readers
212ca86 is described below

commit 212ca86b8c2e7671e4980ea6c2b869286a4dac06
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Fri May 22 09:53:35 2020 +0900

    [SPARK-31785][SQL][TESTS] Add a helper function to test all parquet readers
    
    ### What changes were proposed in this pull request?
    Add `withAllParquetReaders` to `ParquetTest`. The function allow to run a 
block of code for all available Parquet readers.
    
    ### Why are the changes needed?
    1. It simplifies tests
    2. Allow to test all parquet readers that could be available in projects 
based on Apache Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    By running affected test suites.
    
    Closes #28598 from MaxGekk/add-withAllParquetReaders.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit 60118a242639df060a9fdcaa4f14cd072ea3d056)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../datasources/parquet/ParquetFilterSuite.scala   |  39 +++---
 .../datasources/parquet/ParquetIOSuite.scala       | 144 ++++++++++-----------
 .../parquet/ParquetInteroperabilitySuite.scala     |   8 +-
 .../datasources/parquet/ParquetQuerySuite.scala    |  30 ++---
 .../datasources/parquet/ParquetTest.scala          |   7 +
 5 files changed, 106 insertions(+), 122 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 5cf2129..7b33cef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -781,10 +781,9 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
 
   test("Filter applied on merged Parquet schema with new column should work") {
     import testImplicits._
-    Seq("true", "false").foreach { vectorized =>
+    withAllParquetReaders {
       withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
-        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
-        SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
+        SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
         withTempPath { dir =>
           val path1 = s"${dir.getCanonicalPath}/table1"
           (1 to 3).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(path1)
@@ -1219,24 +1218,22 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
   }
 
   test("SPARK-17213: Broken Parquet filter push-down for string columns") {
-    Seq(true, false).foreach { vectorizedEnabled =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorizedEnabled.toString) {
-        withTempPath { dir =>
-          import testImplicits._
+    withAllParquetReaders {
+      withTempPath { dir =>
+        import testImplicits._
 
-          val path = dir.getCanonicalPath
-          // scalastyle:off nonascii
-          Seq("a", "é").toDF("name").write.parquet(path)
-          // scalastyle:on nonascii
+        val path = dir.getCanonicalPath
+        // scalastyle:off nonascii
+        Seq("a", "é").toDF("name").write.parquet(path)
+        // scalastyle:on nonascii
 
-          assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
-          assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
+        assert(spark.read.parquet(path).where("name > 'a'").count() == 1)
+        assert(spark.read.parquet(path).where("name >= 'a'").count() == 2)
 
-          // scalastyle:off nonascii
-          assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
-          assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
-          // scalastyle:on nonascii
-        }
+        // scalastyle:off nonascii
+        assert(spark.read.parquet(path).where("name < 'é'").count() == 1)
+        assert(spark.read.parquet(path).where("name <= 'é'").count() == 2)
+        // scalastyle:on nonascii
       }
     }
   }
@@ -1244,8 +1241,8 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
   test("SPARK-31026: Parquet predicate pushdown for fields having dots in the 
names") {
     import testImplicits._
 
-    Seq(true, false).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString,
+    withAllParquetReaders {
+      withSQLConf(
           SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString,
           SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
         withTempPath { path =>
@@ -1255,7 +1252,7 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
         }
       }
 
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString,
+      withSQLConf(
           // Makes sure disabling 'spark.sql.parquet.recordFilter' still 
enables
           // row group level filtering.
           SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 87b4db3..f075d04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -647,47 +647,39 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   }
 
   test("read dictionary encoded decimals written as INT32") {
-    ("true" :: "false" :: Nil).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) 
{
-        checkAnswer(
-          // Decimal column in this file is encoded using plain dictionary
-          readResourceParquetFile("test-data/dec-in-i32.parquet"),
-          spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 
'i32_dec))
-      }
+    withAllParquetReaders {
+      checkAnswer(
+        // Decimal column in this file is encoded using plain dictionary
+        readResourceParquetFile("test-data/dec-in-i32.parquet"),
+        spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 
'i32_dec))
     }
   }
 
   test("read dictionary encoded decimals written as INT64") {
-    ("true" :: "false" :: Nil).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) 
{
-        checkAnswer(
-          // Decimal column in this file is encoded using plain dictionary
-          readResourceParquetFile("test-data/dec-in-i64.parquet"),
-          spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'i64_dec))
-      }
+    withAllParquetReaders {
+      checkAnswer(
+        // Decimal column in this file is encoded using plain dictionary
+        readResourceParquetFile("test-data/dec-in-i64.parquet"),
+        spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'i64_dec))
     }
   }
 
   test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
-    ("true" :: "false" :: Nil).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) 
{
-        checkAnswer(
-          // Decimal column in this file is encoded using plain dictionary
-          readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
-          spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'fixed_len_dec))
-      }
+    withAllParquetReaders {
+      checkAnswer(
+        // Decimal column in this file is encoded using plain dictionary
+        readResourceParquetFile("test-data/dec-in-fixed-len.parquet"),
+        spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'fixed_len_dec))
     }
   }
 
   test("read dictionary and plain encoded timestamp_millis written as INT64") {
-    ("true" :: "false" :: Nil).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) 
{
-        checkAnswer(
-          // timestamp column in this file is encoded using combination of 
plain
-          // and dictionary encodings.
-          readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
-          (1 to 3).map(i => Row(new java.sql.Timestamp(10))))
-      }
+    withAllParquetReaders {
+      checkAnswer(
+        // timestamp column in this file is encoded using combination of plain
+        // and dictionary encodings.
+        readResourceParquetFile("test-data/timemillis-in-i64.parquet"),
+        (1 to 3).map(i => Row(new java.sql.Timestamp(10))))
     }
   }
 
@@ -943,23 +935,21 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
       }
     }
 
-    Seq(false, true).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-        checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", 
"1001-01-01")
-        checkReadMixedFiles(
-          "before_1582_timestamp_micros_v2_4.snappy.parquet",
-          "TIMESTAMP_MICROS",
-          "1001-01-01 01:02:03.123456")
-        checkReadMixedFiles(
-          "before_1582_timestamp_millis_v2_4.snappy.parquet",
-          "TIMESTAMP_MILLIS",
-          "1001-01-01 01:02:03.123")
-
-        // INT96 is a legacy timestamp format and we always rebase the seconds 
for it.
-        checkAnswer(readResourceParquetFile(
-          "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
-          Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
-      }
+    withAllParquetReaders {
+      checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", 
"1001-01-01")
+      checkReadMixedFiles(
+        "before_1582_timestamp_micros_v2_4.snappy.parquet",
+        "TIMESTAMP_MICROS",
+        "1001-01-01 01:02:03.123456")
+      checkReadMixedFiles(
+        "before_1582_timestamp_millis_v2_4.snappy.parquet",
+        "TIMESTAMP_MILLIS",
+        "1001-01-01 01:02:03.123")
+
+      // INT96 is a legacy timestamp format and we always rebase the seconds 
for it.
+      checkAnswer(readResourceParquetFile(
+        "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
     }
   }
 
@@ -984,27 +974,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
                   .parquet(path)
               }
 
-              Seq(false, true).foreach { vectorized =>
-                withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-                  // The file metadata indicates if it needs rebase or not, so 
we can always get the
-                  // correct result regardless of the "rebase mode" config.
-                  Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
-                    withSQLConf(
-                      SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> 
mode.toString) {
-                      checkAnswer(
-                        spark.read.parquet(path),
-                        Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
-                    }
-                  }
-
-                  // Force to not rebase to prove the written datetime values 
are rebased
-                  // and we will get wrong result if we don't rebase while 
reading.
-                  withSQLConf("spark.test.forceNoRebase" -> "true") {
+              withAllParquetReaders {
+                // The file metadata indicates if it needs rebase or not, so 
we can always get the
+                // correct result regardless of the "rebase mode" config.
+                Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+                  withSQLConf(
+                    SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> 
mode.toString) {
                     checkAnswer(
                       spark.read.parquet(path),
-                      Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
+                      Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
                   }
                 }
+
+                // Force to not rebase to prove the written datetime values 
are rebased
+                // and we will get wrong result if we don't rebase while 
reading.
+                withSQLConf("spark.test.forceNoRebase" -> "true") {
+                  checkAnswer(
+                    spark.read.parquet(path),
+                    Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
+                }
               }
             }
           }
@@ -1027,26 +1015,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
             .parquet(path)
         }
 
-        Seq(false, true).foreach { vectorized =>
-          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-            // The file metadata indicates if it needs rebase or not, so we 
can always get the
-            // correct result regardless of the "rebase mode" config.
-            Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
-              withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
mode.toString) {
-                checkAnswer(
-                  spark.read.parquet(path),
-                  Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
-              }
-            }
-
-            // Force to not rebase to prove the written datetime values are 
rebased and we will get
-            // wrong result if we don't rebase while reading.
-            withSQLConf("spark.test.forceNoRebase" -> "true") {
+        withAllParquetReaders {
+          // The file metadata indicates if it needs rebase or not, so we can 
always get the
+          // correct result regardless of the "rebase mode" config.
+          Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode =>
+            withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> 
mode.toString) {
               checkAnswer(
                 spark.read.parquet(path),
-                Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
+                Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
             }
           }
+
+          // Force to not rebase to prove the written datetime values are 
rebased and we will get
+          // wrong result if we don't rebase while reading.
+          withSQLConf("spark.test.forceNoRebase" -> "true") {
+            checkAnswer(
+              spark.read.parquet(path),
+              Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
+          }
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 7d75077..a14f641 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -124,12 +124,11 @@ class ParquetInteroperabilitySuite extends 
ParquetCompatibilityTest with SharedS
       FileUtils.copyFile(new File(impalaPath), new File(tableDir, 
"part-00001.parq"))
 
       Seq(false, true).foreach { int96TimestampConversion =>
-        Seq(false, true).foreach { vectorized =>
+        withAllParquetReaders {
           withSQLConf(
               (SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
                 SQLConf.ParquetOutputTimestampType.INT96.toString),
-              (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, 
int96TimestampConversion.toString()),
-              (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, 
vectorized.toString())
+              (SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION.key, 
int96TimestampConversion.toString())
           ) {
             val readBack = 
spark.read.parquet(tableDir.getAbsolutePath).collect()
             assert(readBack.size === 6)
@@ -149,7 +148,8 @@ class ParquetInteroperabilitySuite extends 
ParquetCompatibilityTest with SharedS
             val fullExpectations = (ts ++ 
impalaExpectations).map(_.toString).sorted.toArray
             val actual = readBack.map(_.getTimestamp(0).toString).sorted
             withClue(
-              s"int96TimestampConversion = $int96TimestampConversion; 
vectorized = $vectorized") {
+              s"int96TimestampConversion = $int96TimestampConversion; " +
+              s"vectorized = ${SQLConf.get.parquetVectorizedReaderEnabled}") {
               assert(fullExpectations === actual)
 
               // Now test that the behavior is still correct even with a 
filter which could get
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 917aaba..05d305a9b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -168,11 +168,9 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
       withTempPath { file =>
         val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
         df.write.parquet(file.getCanonicalPath)
-        ("true" :: "false" :: Nil).foreach { vectorized =>
-          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized) {
-            val df2 = spark.read.parquet(file.getCanonicalPath)
-            checkAnswer(df2, df.collect().toSeq)
-          }
+        withAllParquetReaders {
+          val df2 = spark.read.parquet(file.getCanonicalPath)
+          checkAnswer(df2, df.collect().toSeq)
         }
       }
     }
@@ -791,15 +789,13 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
   }
 
   test("SPARK-26677: negated null-safe equality comparison should not filter 
matched row groups") {
-    (true :: false :: Nil).foreach { vectorized =>
-      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized.toString) {
-        withTempPath { path =>
-          // Repeated values for dictionary encoding.
-          Seq(Some("A"), Some("A"), None).toDF.repartition(1)
-            .write.parquet(path.getAbsolutePath)
-          val df = spark.read.parquet(path.getAbsolutePath)
-          checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
-        }
+    withAllParquetReaders {
+      withTempPath { path =>
+        // Repeated values for dictionary encoding.
+        Seq(Some("A"), Some("A"), None).toDF.repartition(1)
+          .write.parquet(path.getAbsolutePath)
+        val df = spark.read.parquet(path.getAbsolutePath)
+        checkAnswer(stripSparkFilter(df.where("NOT (value <=> 'A')")), df)
       }
     }
   }
@@ -821,10 +817,8 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
         withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> toTsType) {
           write(df2.write.mode(SaveMode.Append))
         }
-        Seq("true", "false").foreach { vectorized =>
-          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
vectorized) {
-            checkAnswer(readback, df1.unionAll(df2))
-          }
+        withAllParquetReaders {
+          checkAnswer(readback, df1.unionAll(df2))
         }
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index c833d5f..f572697 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -162,4 +162,11 @@ private[sql] trait ParquetTest extends 
FileBasedDataSourceTest {
   protected def getResourceParquetFilePath(name: String): String = {
     Thread.currentThread().getContextClassLoader.getResource(name).toString
   }
+
+  def withAllParquetReaders(code: => Unit): Unit = {
+    // test the row-based reader
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false")(code)
+    // test the vectorized reader
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to