Repository: spark Updated Branches: refs/heads/branch-2.1 9b749b6ce -> 6f366fbbf
[SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully removed ## What changes were proposed in this pull request? Backport SPARK-21721 to branch 2.1: We put staging path to delete into the deleteOnExit cache of FileSystem in case of the path can't be successfully removed. But when we successfully remove the path, we don't remove it from the cache. We should do it to avoid continuing grow the cache size. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #18947 from viirya/SPARK-21721-backport-2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f366fbb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f366fbb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f366fbb Branch: refs/heads/branch-2.1 Commit: 6f366fbbf8dc0a00050040891635e1caae8a4faa Parents: 9b749b6 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Tue Aug 15 08:48:00 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Aug 15 08:48:00 2017 -0700 ---------------------------------------------------------------------- .../hive/execution/InsertIntoHiveTable.scala | 8 +++++++- .../sql/hive/execution/SQLQuerySuite.scala | 21 +++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3b9c2fc..3567819 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -382,7 +382,13 @@ case class InsertIntoHiveTable( // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { - createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } + createdTempDir.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(path) + } + } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1619115..73ceaf8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql.hive.execution import java.io.{File, PrintWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import java.util.Set import scala.sys.process.{Process, ProcessLogger} import scala.util.Try import com.google.common.io.Files -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -2031,4 +2032,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4)) } } + + test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") { + withTable("test21721") { + val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") + deleteOnExitField.setAccessible(true) + + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] + + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() + sql("CREATE TABLE test21721 (key INT, value STRING)") + val pathSizeToDeleteOnExit = setOfPath.size() + + (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1")) + + assert(setOfPath.size() == pathSizeToDeleteOnExit) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org