This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 7c69614f067 [SPARK-39833][SQL] Disable Parquet column index in DSv1 to 
fix a correctness issue in the case of overlapping partition and data columns
7c69614f067 is described below

commit 7c69614f067c9eb68d997e8881d9b5845cde00fd
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Sun Aug 21 18:59:48 2022 +0900

    [SPARK-39833][SQL] Disable Parquet column index in DSv1 to fix a 
correctness issue in the case of overlapping partition and data columns
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a correctness issue in Parquet DSv1 FileFormat when 
projection does not contain columns referenced in pushed filters. This 
typically happens when partition columns and data columns overlap.
    
    This could result in empty result when in fact there were records matching 
predicate as can be seen in the provided fields.
    
    The problem is especially visible with `count()` and `show()` reporting 
different results, for example, show() would return 1+ records where the 
count() would return 0.
    
    In Parquet, when the predicate is provided and column index is enabled, we 
would try to filter row ranges to figure out what the count should be. 
Unfortunately, there is an issue that if the projection is empty or is not in 
the set of filter columns, any checks on columns would fail and 0 rows are 
returned (`RowRanges.EMPTY`) even though there is data matching the filter.
    
    Note that this is rather a mitigation, a quick fix. The actual fix needs to 
go into Parquet-MR: https://issues.apache.org/jira/browse/PARQUET-2170.
    
    The fix is not required in DSv2 where the overlapping columns are removed 
in `FileScanBuilder::readDataSchema()`.
    
    ### Why are the changes needed?
    
    Fixes a correctness issue when projection columns are not referenced by 
columns in pushed down filters or the schema is empty in Parquet DSv1.
    
    Downsides: Parquet column filter would be disabled if it had not been 
explicitly enabled which could affect performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a unit test that reproduces this behaviour. The test fails without 
the fix and passes with the fix.
    
    Closes #37419 from sadikovi/SPARK-39833.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit cde71aaf173aadd14dd6393b09e9851b5caad903)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  5 +++++
 .../datasources/parquet/ParquetQuerySuite.scala    | 22 ++++++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9765e7c7801..2fa0854c983 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -230,6 +230,11 @@ class ParquetFileFormat
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
+    // See PARQUET-2170.
+    // Disable column index optimisation when required schema does not have 
columns that appear in
+    // pushed filters to avoid getting incorrect results.
+    
hadoopConf.setBooleanIfUnset(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, 
false)
+
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 33656c84c88..d0a9a93b00f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -1065,6 +1065,28 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
       }
     }
   }
+
+  test("SPARK-39833: pushed filters with count()") {
+    withTempPath { path =>
+      val p = 
s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+      Seq(0).toDF("COL").coalesce(1).write.save(p)
+      val df = spark.read.parquet(path.getCanonicalPath)
+      checkAnswer(df.filter("col = 0"), Seq(Row(0)))
+      assert(df.filter("col = 0").count() == 1, "col")
+      assert(df.filter("COL = 0").count() == 1, "COL")
+    }
+  }
+
+  test("SPARK-39833: pushed filters with project without filter columns") {
+    withTempPath { path =>
+      val p = 
s"${path.getCanonicalPath}${File.separator}col=0${File.separator}"
+      Seq((0, 1)).toDF("COL", "a").coalesce(1).write.save(p)
+      val df = spark.read.parquet(path.getCanonicalPath)
+      checkAnswer(df.filter("col = 0"), Seq(Row(0, 1)))
+      assert(df.filter("col = 0").select("a").collect().toSeq == Row(1) :: Nil)
+      assert(df.filter("col = 0 and a = 1").select("a").collect().toSeq == 
Row(1) :: Nil)
+    }
+  }
 }
 
 class ParquetV2QuerySuite extends ParquetQuerySuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to