Repository: spark
Updated Branches:
  refs/heads/branch-1.1 91d0effb3 -> d70754df0


Revert "[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to 
ensure deleting the temp file"

This reverts commit 36f3c499fd1ad53a68a084d6a16a2c68099e7049.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d70754df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d70754df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d70754df

Branch: refs/heads/branch-1.1
Commit: d70754df04031a8bd99e2aa104303e98e358128e
Parents: 91d0eff
Author: Andrew Or <and...@databricks.com>
Authored: Tue Mar 3 13:05:13 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Mar 3 13:05:13 2015 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 52 ++++----------------
 1 file changed, 9 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d70754df/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index abb1f11..5619b30 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -434,15 +434,6 @@ class ExternalAppendOnlyMap[K, V, C](
     private var batchIndex = 0  // Which batch we're in
     private var fileStream: FileInputStream = null
 
-    @volatile private var closed = false
-
-    // A volatile variable to remember which DeserializationStream is using. 
Need to set it when we
-    // open a DeserializationStream. But we should use `deserializeStream` 
rather than
-    // `deserializeStreamToBeClosed` to read the content because touching a 
volatile variable will
-    // reduce the performance. It must be volatile so that we can see its 
correct value in the
-    // `finalize` method, which could run in any thread.
-    @volatile private var deserializeStreamToBeClosed: DeserializationStream = 
null
-
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher 
level streams
     private var deserializeStream = nextBatchStream()
@@ -457,7 +448,6 @@ class ExternalAppendOnlyMap[K, V, C](
       // we're still in a valid batch.
       if (batchIndex < batchOffsets.length - 1) {
         if (deserializeStream != null) {
-          deserializeStreamToBeClosed = null
           deserializeStream.close()
           fileStream.close()
           deserializeStream = null
@@ -476,11 +466,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
         val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
         val compressedStream = blockManager.wrapForCompression(blockId, 
bufferedStream)
-        // Before returning the stream, assign it to 
`deserializeStreamToBeClosed` so that we can
-        // close it in `finalize` and also avoid to touch the volatile 
`deserializeStreamToBeClosed`
-        // during reading the (K, C) pairs.
-        deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
-        deserializeStreamToBeClosed
+        ser.deserializeStream(compressedStream)
       } else {
         // No more batches left
         cleanup()
@@ -529,34 +515,14 @@ class ExternalAppendOnlyMap[K, V, C](
       item
     }
 
-    // TODO: Now only use `finalize` to ensure `close` gets called to clean up 
the resources. In the
-    // future, we need some mechanism to ensure this gets called once the 
resources are not used.
-    private def cleanup(): Unit = {
-      if (!closed) {
-        closed = true
-        batchIndex = batchOffsets.length  // Prevent reading any other batch
-        fileStream = null
-        try {
-          val ds = deserializeStreamToBeClosed
-          deserializeStreamToBeClosed = null
-          deserializeStream = null
-          if (ds != null) {
-            ds.close()
-          }
-        } finally {
-          if (file.exists()) {
-            file.delete()
-          }
-        }
-      }
-    }
-
-    override def finalize(): Unit = {
-      try {
-        cleanup()
-      } finally {
-        super.finalize()
-      }
+    // TODO: Ensure this gets called even if the iterator isn't drained.
+    private def cleanup() {
+      batchIndex = batchOffsets.length  // Prevent reading any other batch
+      val ds = deserializeStream
+      deserializeStream = null
+      fileStream = null
+      ds.close()
+      file.delete()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to