Repository: spark Updated Branches: refs/heads/branch-2.1 04fbb9e09 -> 4b4c3bf3f
[SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache ## What changes were proposed in this pull request? If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll. While the order to do these two actions is wrong, this lead to the refresh action does not take effect. ``` override def refresh(): Unit = { refresh0() fileStatusCache.invalidateAll() } private def refresh0(): Unit = { val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null } ``` ## How was this patch tested? unit test added Author: windpiger <song...@outlook.com> Closes #17079 from windpiger/fixInMemoryFileIndexRefresh. (cherry picked from commit a350bc16d36c58b48ac01f0258678ffcdb77e793) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b4c3bf3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b4c3bf3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b4c3bf3 Branch: refs/heads/branch-2.1 Commit: 4b4c3bf3f78635d53ff983eabe37a4032947b499 Parents: 04fbb9e Author: windpiger <song...@outlook.com> Authored: Tue Feb 28 00:16:49 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Feb 28 00:17:00 2017 -0800 ---------------------------------------------------------------------- .../datasources/InMemoryFileIndex.scala | 2 +- .../execution/datasources/FileIndexSuite.scala | 26 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4b4c3bf3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 7531f0a..ee4d086 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -66,8 +66,8 @@ class InMemoryFileIndex( } override def refresh(): Unit = { - refresh0() fileStatusCache.invalidateAll() + refresh0() } private def refresh0(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/4b4c3bf3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index b7a472b..c638f5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -177,6 +177,32 @@ class FileIndexSuite extends SharedSQLContext { assert(catalog2.allFiles().nonEmpty) } } + + test("refresh for InMemoryFileIndex with FileStatusCache") { + withTempDir { dir => + val fileStatusCache = FileStatusCache.getOrCreate(spark) + val dirPath = new Path(dir.getAbsolutePath) + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) + val catalog = + new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) { + def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq + def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq + } + + val file = new File(dir, "text.txt") + stringToFile(file, "text") + assert(catalog.leafDirPaths.isEmpty) + assert(catalog.leafFilePaths.isEmpty) + + catalog.refresh() + + assert(catalog.leafFilePaths.size == 1) + assert(catalog.leafFilePaths.head == fs.makeQualified(new Path(file.getAbsolutePath))) + + assert(catalog.leafDirPaths.size == 1) + assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath)) + } + } } class FakeParentPathFileSystem extends RawLocalFileSystem { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org