This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 491f9d586ae6 [SPARK-52339][SQL] Fix comparison of `InMemoryFileIndex` 
instances
491f9d586ae6 is described below

commit 491f9d586ae68b5f7859ce8c3c225ca676eb7340
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Mon Jun 23 14:21:00 2025 +0800

    [SPARK-52339][SQL] Fix comparison of `InMemoryFileIndex` instances
    
    ### What changes were proposed in this pull request?
    
    This PR changes `InMemoryFileIndex#equals` to compare a non-distinct 
collection of root paths rather than a distinct set of root paths. Without this 
change, `InMemoryFileIndex#equals` considers the following two collections of 
root paths to be equal, even though they represent a different number of rows:
    ```
    ["/tmp/test", "/tmp/test"]
    ["/tmp/test", "/tmp/test", "/tmp/test"]
    ```
    
    ### Why are the changes needed?
    
    The bug can cause correctness issues, e.g.
    ```
    // create test data
    val data = Seq((1, 2), (2, 3)).toDF("a", "b")
    data.write.mode("overwrite").csv("/tmp/test")
    
    val fileList1 = List.fill(2)("/tmp/test")
    val fileList2 = List.fill(3)("/tmp/test")
    
    val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
    val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)
    
    df1.count() // correctly returns 4
    df2.count() // correctly returns 6
    
    // the following is the same as above, except df1 is persisted
    val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
    val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)
    
    df1.count() // correctly returns 4
    df2.count() // incorrectly returns 4!!
    ```
    In the above example, df1 and df2 were created with a different number of 
paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is 
the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are 
considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.
    
    The same bug also causes inappropriate exchange reuse.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51043 from bersprockets/multi_path_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit ccded6c21b28e6579df93b7f25a6c00c056f3d09)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/datasources/InMemoryFileIndex.scala  |  2 +-
 .../sql/execution/datasources/FileIndexSuite.scala | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 3b8a20c7cf74..ec258e2e4645 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -103,7 +103,7 @@ class InMemoryFileIndex(
   }
 
   override def equals(other: Any): Boolean = other match {
-    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case hdfs: InMemoryFileIndex => rootPaths.sorted == hdfs.rootPaths.sorted
     case _ => false
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 33b4cc1d2e7f..1150f6163b97 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -656,6 +656,30 @@ class FileIndexSuite extends SharedSparkSession {
     assert(FileIndexOptions.isValidOption("modifiedafter"))
     assert(FileIndexOptions.isValidOption("pathglobfilter"))
   }
+
+  test("SPARK-52339: Correctly compare root paths") {
+    withTempDir { dir =>
+      val file1 = new File(dir, "text1.txt")
+      stringToFile(file1, "text1")
+      val file2 = new File(dir, "text2.txt")
+      stringToFile(file2, "text2")
+      val path1 = new Path(file1.getCanonicalPath)
+      val path2 = new Path(file2.getCanonicalPath)
+
+      val schema = StructType(Seq(StructField("a", StringType, false)))
+
+      // Verify that the order of paths doesn't matter
+      val fileIndex1a = new InMemoryFileIndex(spark, Seq(path1, path2), 
Map.empty, Some(schema))
+      val fileIndex1b = new InMemoryFileIndex(spark, Seq(path2, path1), 
Map.empty, Some(schema))
+      assert(fileIndex1a == fileIndex1b)
+
+      // Verify that a different number of paths does matter
+      val fileIndex2a = new InMemoryFileIndex(spark, Seq(path1, path1), 
Map.empty, Some(schema))
+      val fileIndex2b = new InMemoryFileIndex(spark, Seq(path1, path1, path1),
+        Map.empty, Some(schema))
+      assert(fileIndex2a != fileIndex2b)
+    }
+  }
 }
 
 object DeletionRaceFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to