Repository: spark
Updated Branches:
  refs/heads/master 38e624a73 -> 90095bf3c


[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file

This PR adds a `finalize` method in DiskMapIterator to clean up the resources 
even if some exception happens during processing data.

Author: zsxwing <zsxw...@gmail.com>

Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits:

d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure 
deleting the temp file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90095bf3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90095bf3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90095bf3

Branch: refs/heads/master
Commit: 90095bf3ce9304d09a32ceffaa99069079071b59
Parents: 38e624a
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Feb 19 18:37:31 2015 +0000
Committer: Ubuntu <ubu...@ip-172-31-36-14.us-west-2.compute.internal>
Committed: Thu Feb 19 18:37:31 2015 +0000

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 52 ++++++++++++++++----
 1 file changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90095bf3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8a0f5a6..fc7e86e 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
     private var batchIndex = 0  // Which batch we're in
     private var fileStream: FileInputStream = null
 
+    @volatile private var closed = false
+
+    // A volatile variable to remember which DeserializationStream is using. 
Need to set it when we
+    // open a DeserializationStream. But we should use `deserializeStream` 
rather than
+    // `deserializeStreamToBeClosed` to read the content because touching a 
volatile variable will
+    // reduce the performance. It must be volatile so that we can see its 
correct value in the
+    // `finalize` method, which could run in any thread.
+    @volatile private var deserializeStreamToBeClosed: DeserializationStream = 
null
+
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
     private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
       // we're still in a valid batch.
       if (batchIndex < batchOffsets.length - 1) {
         if (deserializeStream != null) {
+          deserializeStreamToBeClosed = null
           deserializeStream.close()
           fileStream.close()
           deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
 
         val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
         val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
-        ser.deserializeStream(compressedStream)
+        // Before returning the stream, assign it to 
`deserializeStreamToBeClosed` so that we can
+        // close it in `finalize` and also avoid to touch the volatile 
`deserializeStreamToBeClosed`
+        // during reading the (K, C) pairs.
+        deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
+        deserializeStreamToBeClosed
       } else {
         // No more batches left
         cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
       item
     }
 
-    // TODO: Ensure this gets called even if the iterator isn't drained.
-    private def cleanup() {
-      batchIndex = batchOffsets.length  // Prevent reading any other batch
-      val ds = deserializeStream
-      deserializeStream = null
-      fileStream = null
-      ds.close()
-      file.delete()
+    // TODO: Now only use `finalize` to ensure `close` gets called to clean up 
the resources. In the
+    // future, we need some mechanism to ensure this gets called once the 
resources are not used.
+    private def cleanup(): Unit = {
+      if (!closed) {
+        closed = true
+        batchIndex = batchOffsets.length  // Prevent reading any other batch
+        fileStream = null
+        try {
+          val ds = deserializeStreamToBeClosed
+          deserializeStreamToBeClosed = null
+          deserializeStream = null
+          if (ds != null) {
+            ds.close()
+          }
+        } finally {
+          if (file.exists()) {
+            file.delete()
+          }
+        }
+      }
+    }
+
+    override def finalize(): Unit = {
+      try {
+        cleanup()
+      } finally {
+        super.finalize()
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to