Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1bbf9ff63 -> 39d2fdb51


[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use 
checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored on 
non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves 
hidden HDFS-style checksum (CRC) files in the log directory, one file per 
batch. This PR modifies HDFSMetadataLog so that it detects the use of a 
filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether 
HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem.  Ran 
the entire regression suite.

Author: frreiss <frre...@us.ibm.com>

Closes #15027 from frreiss/fred-17475.

(cherry picked from commit 620da3b4828b3580c7ed7339b2a07938e6be1bb1)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39d2fdb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39d2fdb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39d2fdb5

Branch: refs/heads/branch-2.1
Commit: 39d2fdb51233ed9b1aaf3adaa3267853f5e58c0f
Parents: 1bbf9ff
Author: frreiss <frre...@us.ibm.com>
Authored: Tue Nov 1 23:00:17 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Nov 1 23:00:28 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 +++++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala   | 6 ++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index c7235320..9a0f87c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
           // It will fail if there is an existing file (someone has committed 
the batch)
           logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
           fileManager.rename(tempPath, batchIdToPath(batchId))
+
+          // SPARK-17475: HDFSMetadataLog should not leak CRC files
+          // If the underlying filesystem didn't rename the CRC file, delete 
it.
+          val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
+          if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
           return
         } catch {
           case e: IOException if isFileAlreadyExistsException(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9c1d26d..d03e08d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
       assert(metadataLog.get(1).isEmpty)
       assert(metadataLog.get(2).isDefined)
       assert(metadataLog.getLatest().get._1 == 2)
+
+      // There should be exactly one file, called "2", in the metadata 
directory.
+      // This check also tests for regressions of SPARK-17475
+      val allFiles = new 
File(metadataLog.metadataPath.toString).listFiles().toSeq
+      assert(allFiles.size == 1)
+      assert(allFiles(0).getName() == "2")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to