Repository: spark
Updated Branches:
  refs/heads/branch-2.1 04fbb9e09 -> 4b4c3bf3f


[SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate 
and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache

## What changes were proposed in this pull request?

If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the 
FileStatusCache to re-generate the cachedLeafFiles etc, then call 
FileStatusCache.invalidateAll.

While the order to do these two actions is wrong, this lead to the refresh 
action does not take effect.

```
  override def refresh(): Unit = {
    refresh0()
    fileStatusCache.invalidateAll()
  }

  private def refresh0(): Unit = {
    val files = listLeafFiles(rootPaths)
    cachedLeafFiles =
      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => 
f.getPath -> f)
    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
    cachedPartitionSpec = null
  }
```
## How was this patch tested?
unit test added

Author: windpiger <song...@outlook.com>

Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.

(cherry picked from commit a350bc16d36c58b48ac01f0258678ffcdb77e793)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b4c3bf3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b4c3bf3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b4c3bf3

Branch: refs/heads/branch-2.1
Commit: 4b4c3bf3f78635d53ff983eabe37a4032947b499
Parents: 04fbb9e
Author: windpiger <song...@outlook.com>
Authored: Tue Feb 28 00:16:49 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Feb 28 00:17:00 2017 -0800

----------------------------------------------------------------------
 .../datasources/InMemoryFileIndex.scala         |  2 +-
 .../execution/datasources/FileIndexSuite.scala  | 26 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b4c3bf3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 7531f0a..ee4d086 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -66,8 +66,8 @@ class InMemoryFileIndex(
   }
 
   override def refresh(): Unit = {
-    refresh0()
     fileStatusCache.invalidateAll()
+    refresh0()
   }
 
   private def refresh0(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4b4c3bf3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index b7a472b..c638f5f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -177,6 +177,32 @@ class FileIndexSuite extends SharedSQLContext {
       assert(catalog2.allFiles().nonEmpty)
     }
   }
+
+  test("refresh for InMemoryFileIndex with FileStatusCache") {
+    withTempDir { dir =>
+      val fileStatusCache = FileStatusCache.getOrCreate(spark)
+      val dirPath = new Path(dir.getAbsolutePath)
+      val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+      val catalog =
+        new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, 
fileStatusCache) {
+          def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+          def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+        }
+
+      val file = new File(dir, "text.txt")
+      stringToFile(file, "text")
+      assert(catalog.leafDirPaths.isEmpty)
+      assert(catalog.leafFilePaths.isEmpty)
+
+      catalog.refresh()
+
+      assert(catalog.leafFilePaths.size == 1)
+      assert(catalog.leafFilePaths.head == fs.makeQualified(new 
Path(file.getAbsolutePath)))
+
+      assert(catalog.leafDirPaths.size == 1)
+      assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
+    }
+  }
 }
 
 class FakeParentPathFileSystem extends RawLocalFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to