[
https://issues.apache.org/jira/browse/SPARK-52032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-52032:
----------------------------------
Summary: ORC filter pushdown causes incorrect results with eqNullSafe (<=>)
in DataFrame filter (was: ORC filter pushdown causes incorrect results with
eqNullSafe (<=>) in DataFrame filter (Parquet works as expected))
> ORC filter pushdown causes incorrect results with eqNullSafe (<=>) in
> DataFrame filter
> --------------------------------------------------------------------------------------
>
> Key: SPARK-52032
> URL: https://issues.apache.org/jira/browse/SPARK-52032
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.5
> Reporter: Masafumi Ito
> Assignee: Dongjoon Hyun
> Priority: Critical
> Labels: pull-request-available
> Fix For: 4.0.1, 3.5.7
>
>
> h1. Overview of the Issue
> When applying a filter using `eqNullSafe` (`<=>`, null-safe equality
> operator) to a DataFrame created from an ORC file, rows containing null
> values are incorrectly excluded, even though they should be retained.
> With Parquet files, the same operation produces the expected results.
> Additionally, if ORC filter pushdown is disabled, the ORC DataFrame also
> behaves correctly.
> h1. Detailed Explanation
> h2. Steps to Reproduce
> 1. Prepare a DataFrame (created from an ORC file) with the following data:
> {code:java}
> +---+----+-------------------+
> | id|name| email|
> +---+----+-------------------+
> | 1|test| null|
> | 2|null|[email protected]|
> +---+----+-------------------+{code}
> 2. Apply the following filter operation to this DataFrame (`df`):
> {code:java}
> df.filter(!(col("name") <=> "dummy")){code}
> 3. *Expected result:* All rows should be retained, as shown below.
> {code:java}
> +---+----+-------------------+
> | id|name| email|
> +---+----+-------------------+
> | 1|test| null|
> | 2|null|[email protected]|
> +---+----+-------------------+{code}
> 4. *Actual result:* The row where `name` is `null` is excluded.
> {code:java}
> +---+----+-------------------+
> | id|name| email|
> +---+----+-------------------+
> | 1|test| null|
> +---+----+-------------------+{code}
> h2. Additional Information
> * With DataFrames created from Parquet files, the same filter operation
> produces the expected results.
> * Disabling ORC filter pushdown (`spark.sql.orc.filterPushdown`) results in
> correct behavior for ORC as well.
> * The Physical Plan is the same for both ORC and Parquet. In both formats,
> the condition `NOT (name <=> dummy)` appears in DataFilters and PushedFilters
> in the plan.
> h3. ORC Physical Plan
> {code:java}
> == Physical Plan ==
> *(1) Filter NOT (name#36 <=> dummy)
> +- *(1) ColumnarToRow
> +- FileScan orc [id#35,name#36,email#37] Batched: true, DataFilters: [NOT
> (name#36 <=> dummy)], Format: ORC, Location: InMemoryFileIndex(1
> paths)[file:/***/spark-null-safe/data/null_safe_..., PartitionFilters: [],
> PushedFilters: [Not(EqualNullSafe(name,dummy))], ReadSchema:
> struct<id:int,name:string,email:string> {code}
> h3. Parquet Physical Plan
> {code:java}
> == Physical Plan ==
> *(1) Filter NOT (name#42 <=> dummy)
> +- *(1) ColumnarToRow
> +- FileScan parquet [id#41,name#42,email#43] Batched: true, DataFilters:
> [NOT (name#42 <=> dummy)], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/***/spark-null-safe/data/null_safe_..., PartitionFilters: [],
> PushedFilters: [Not(EqualNullSafe(name,dummy))], ReadSchema:
> struct<id:int,name:string,email:string>{code}
> Thus, although the physical query plans are identical, only ORC produces
> incorrect filter results.
> h1. Reproduction Code
> {code:java}
> import org.apache.spark.sql.functions.col
> import org.apache.spark.sql.SparkSession
> object NullSafeTest {
> private val projectDir = new java.io.File(".").getCanonicalPath
> private val dataPath = f"$projectDir/data/null_safe_test"
> def main(args: Array[String]): Unit = {
> val spark = SparkSession.builder
> .appName("NullSafeTest")
> .master("local[*]")
> // .config("spark.sql.orc.filterPushdown", "false") // ORC works
> as expected without filter pushdown
> .getOrCreate()
> import spark.implicits._
> val df = Seq(
> (1, "test", null),
> (2, null, "[email protected]")
> ).toDF("id", "name", "email")
> df.show()
> df.write.mode("overwrite").orc(s"$dataPath/orc")
> df.write.mode("overwrite").parquet(s"$dataPath/parquet")
> val orcDf = spark.read.orc(s"$dataPath/orc")
> val parquetDf = spark.read.parquet(s"$dataPath/parquet")
> val filteredOrcDf = orcDf.filter(!(col("name") <=> "dummy"))
> filteredOrcDf.show()
> filteredOrcDf.explain("extended")
> val filteredParquetDf = parquetDf.filter(!(col("name") <=> "dummy"))
> filteredParquetDf.show()
> filteredParquetDf.explain("extended")
> spark.stop()
> }
> }
> {code}
> h1. Summary
> * When using ORC files with filter pushdown enabled, filtering with
> `eqNullSafe` (`<=>`) unexpectedly excludes rows containing null values.
> * This issue does not occur with Parquet, or with ORC when filter pushdown
> is disabled.
> * There may be a bug in how ORC filter pushdown handles null values.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]