This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9ce141c501 [SPARK-44199] CacheManager refreshes the fileIndex 
unnecessarily
d9ce141c501 is described below

commit d9ce141c50183202dc34a3cd3f5a67060ed1a596
Author: Vihang Karajgaonkar <vihang.karajgaon...@databricks.com>
AuthorDate: Mon Jul 3 16:06:43 2023 -0700

    [SPARK-44199] CacheManager refreshes the fileIndex unnecessarily
    
    ### What changes were proposed in this pull request?
    The `CacheManager` refreshFileIndexIfNecessary logic checks if the 
fileIndex root paths starts with the input path. This is problematic if the 
input path and root path share the prefixes but the root path is not a 
subdirectory of the input path. In such cases, the CacheManager can 
unnecessarily refresh the fileIndex which can fail the query if it does not 
have access to the rootPath for that SparkSession.
    
    ### Why are the changes needed?
    Fixes the bug where the queries on cached dataframe APIs can fail if the 
cached path shares prefix with the different path.
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Unit test
    
    Closes #41749 from vihangk1/master_cachemanager.
    
    Lead-authored-by: Vihang Karajgaonkar <vihang.karajgaon...@databricks.com>
    Co-authored-by: Vihang Karajgaonkar <vihan...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/catalog/Catalog.scala     |  5 +--
 .../apache/spark/sql/execution/CacheManager.scala  | 24 ++++++++++---
 .../org/apache/spark/sql/CacheManagerSuite.scala   | 40 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 93ff3059f62..13b199948e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -632,8 +632,9 @@ abstract class Catalog {
 
   /**
    * Invalidates and refreshes all the cached data (and the associated 
metadata) for any `Dataset`
-   * that contains the given data source path. Path matching is by prefix, 
i.e. "/" would invalidate
-   * everything that is cached.
+   * that contains the given data source path. Path matching is by checking 
for sub-directories,
+   * i.e. "/" would invalidate everything that is cached and "/test/parent" 
would invalidate
+   * everything that is a subdirectory of "/test/parent".
    *
    * @since 2.0.0
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b1153d7a1e8..2afb82cdbc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -359,21 +359,37 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
   }
 
   /**
-   * Refresh the given [[FileIndex]] if any of its root paths starts with 
`qualifiedPath`.
+   * Refresh the given [[FileIndex]] if any of its root paths is a subdirectory
+   * of the `qualifiedPath`.
    * @return whether the [[FileIndex]] is refreshed.
    */
   private def refreshFileIndexIfNecessary(
       fileIndex: FileIndex,
       fs: FileSystem,
       qualifiedPath: Path): Boolean = {
-    val prefixToInvalidate = qualifiedPath.toString
     val needToRefresh = fileIndex.rootPaths
-      .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory).toString)
-      .exists(_.startsWith(prefixToInvalidate))
+      .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
+      .exists(isSubDir(qualifiedPath, _))
     if (needToRefresh) fileIndex.refresh()
     needToRefresh
   }
 
+  /**
+   * Checks if the given child path is a sub-directory of the given parent 
path.
+   * @param qualifiedPathChild:
+   *   Fully qualified child path
+   * @param qualifiedPathParent:
+   *   Fully qualified parent path.
+   * @return
+   *   True if the child path is a sub-directory of the given parent path. 
Otherwise, false.
+   */
+  def isSubDir(qualifiedPathParent: Path, qualifiedPathChild: Path): Boolean = 
{
+    Iterator
+      .iterate(qualifiedPathChild)(_.getParent)
+      .takeWhile(_ != null)
+      .exists(_.equals(qualifiedPathParent))
+  }
+
   /**
    * If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is enabled, just return 
original session.
    */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala
new file mode 100644
index 00000000000..fb8e82dbf90
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CacheManagerSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSparkSession
+
+class CacheManagerSuite extends SparkFunSuite with SharedSparkSession {
+
+  test("SPARK-44199: isSubDirectory tests") {
+    val cacheManager = spark.sharedState.cacheManager
+    val testCases = Map[(String, String), Boolean](
+      ("s3://bucket/a/b", "s3://bucket/a/b/c") -> true,
+      ("s3://bucket/a/b/c", "s3://bucket/a/b/c") -> true,
+      ("s3://bucket/a/b/c", "s3://bucket/a/b") -> false,
+      ("s3://bucket/a/z/c", "s3://bucket/a/b/c") -> false,
+      ("s3://bucket/a/b/c", "abfs://bucket/a/b/c") -> false)
+    testCases.foreach { test =>
+      val result = cacheManager.isSubDir(new Path(test._1._1), new 
Path(test._1._2))
+      assert(result == test._2)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to