This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7258f69  [SPARK-35396] Add AutoCloseable close to BlockManager and 
InMemoryRelation
7258f69 is described below

commit 7258f691887aedcf7ba3eb4e478d67a5637643b9
Author: Chendi Xue <chendi....@intel.com>
AuthorDate: Tue May 25 08:55:25 2021 -0500

    [SPARK-35396] Add AutoCloseable close to BlockManager and InMemoryRelation
    
    This PR is proposing a add-on to support to manual close entries in 
MemoryStore and InMemoryRelation
    
    ### What changes were proposed in this pull request?
    Currently:
        MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all 
OnHeap or OffHeap entries.
    And when memoryStore.remove(blockId) is called, codes will simply remove 
one entry from LinkedHashMap and leverage Java GC to do release work.
    
    This PR:
        We are proposing a add-on to manually close any object stored in 
MemoryStore and InMemoryRelation if this object is extended from AutoCloseable.
    
    Veifiication:
        In our own use case, we implemented a user-defined 
off-heap-hashRelation for BHJ, and we verified that by adding this manual 
close, we can make sure our defined off-heap-hashRelation can be released when 
evict is called.
        Also, we implemented user-defined cachedBatch and will be release when 
InMemoryRelation.clearCache() is called by this PR
    
    ### Why are the changes needed?
    This changes can help to clean some off-heap user-defined object may be 
cached in InMemoryRelation or MemoryStore
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    WIP
    
    Signed-off-by: Chendi Xue <chendi.xueintel.com>
    
    Closes #32534 from xuechendi/support_manual_close_in_memorystore.
    
    Authored-by: Chendi Xue <chendi....@intel.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../apache/spark/storage/memory/MemoryStore.scala  |  32 ++++++-
 .../apache/spark/storage/MemoryStoreSuite.scala    | 100 +++++++++++++++++++++
 2 files changed, 131 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 375d05b..2079b26 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -21,9 +21,14 @@ import java.io.OutputStream
 import java.nio.ByteBuffer
 import java.util.LinkedHashMap
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
 import scala.reflect.ClassTag
+import scala.util.{Failure, Success}
+import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
 
@@ -385,9 +390,33 @@ private[spark] class MemoryStore(
     }
   }
 
+  def manualClose[T <: MemoryEntry[_]](entry: T): T = {
+    val entryManualCloseTasks = Future {
+      entry match {
+        case e: DeserializedMemoryEntry[_] => e.value.foreach {
+          case o: AutoCloseable =>
+            try {
+              o.close
+            } catch {
+              case NonFatal(e) =>
+                logWarning(s"Got NonFatal exception during remove")
+            }
+          case _ =>
+        }
+        case _ =>
+      }
+    }
+    entryManualCloseTasks.onComplete {
+      case Success(_) =>
+      case Failure(e) => throw e
+    }
+    entry
+  }
+
   def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
     val entry = entries.synchronized {
-      entries.remove(blockId)
+      val removed = entries.remove(blockId)
+      manualClose(removed)
     }
     if (entry != null) {
       entry match {
@@ -405,6 +434,7 @@ private[spark] class MemoryStore(
 
   def clear(): Unit = memoryManager.synchronized {
     entries.synchronized {
+      entries.values.asScala.foreach(manualClose)
       entries.clear()
     }
     onHeapUnrollMemoryMap.clear()
diff --git 
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index d6a4e5b..fea4882 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -546,4 +546,104 @@ class MemoryStoreSuite
       }
     }
   }
+
+  test("put user-defined objects to MemoryStore and remove") {
+    val (memoryStore, _) = makeMemoryStore(12000)
+    val blockId = BlockId("rdd_3_10")
+    case class DummyAllocator() {
+      private var allocated: Int = 0
+      def alloc(size: Int): Unit = synchronized {
+        allocated += size
+      }
+      def release(size: Int): Unit = synchronized {
+        allocated -= size
+      }
+      def getAllocatedMemory: Int = synchronized {
+        allocated
+      }
+    }
+    case class NativeObject(alloc: DummyAllocator, size: Int)
+      extends KnownSizeEstimation
+      with AutoCloseable {
+      alloc.alloc(size)
+      var allocated_size: Int = size
+      override def estimatedSize: Long = allocated_size
+      override def close(): Unit = synchronized {
+        alloc.release(allocated_size)
+        allocated_size = 0
+      }
+    }
+    val allocator = DummyAllocator()
+    val nativeObjList = List.fill(40)(NativeObject(allocator, 100))
+    def nativeObjIterator: Iterator[Any] = 
nativeObjList.iterator.asInstanceOf[Iterator[Any]]
+    def putIteratorAsValues[T](
+        blockId: BlockId,
+        iter: Iterator[T],
+        classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+      memoryStore.putIteratorAsValues(blockId, iter, classTag)
+    }
+
+    // Unroll with plenty of space. This should succeed and cache both blocks.
+    assert(putIteratorAsValues("b1", nativeObjIterator, ClassTag.Any).isRight)
+    assert(putIteratorAsValues("b2", nativeObjIterator, ClassTag.Any).isRight)
+
+    memoryStore.remove("b1")
+    memoryStore.remove("b2")
+
+    // Check if allocator was cleared.
+    while (allocator.getAllocatedMemory > 0) {
+      Thread.sleep(500)
+    }
+    assert(allocator.getAllocatedMemory == 0)
+  }
+
+  test("put user-defined objects to MemoryStore and clear") {
+    val (memoryStore, _) = makeMemoryStore(12000)
+    val blockId = BlockId("rdd_3_10")
+    case class DummyAllocator() {
+      private var allocated: Int = 0
+      def alloc(size: Int): Unit = synchronized {
+        allocated += size
+      }
+      def release(size: Int): Unit = synchronized {
+        allocated -= size
+      }
+      def getAllocatedMemory: Int = synchronized {
+        allocated
+      }
+    }
+    case class NativeObject(alloc: DummyAllocator, size: Int)
+      extends KnownSizeEstimation
+      with AutoCloseable {
+
+      alloc.alloc(size)
+      var allocated_size: Int = size
+      override def estimatedSize: Long = allocated_size
+      override def close(): Unit = synchronized {
+        Thread.sleep(10)
+        alloc.release(allocated_size)
+        allocated_size = 0
+      }
+    }
+    val allocator = DummyAllocator()
+    val nativeObjList = List.fill(40)(NativeObject(allocator, 100))
+    def nativeObjIterator: Iterator[Any] = 
nativeObjList.iterator.asInstanceOf[Iterator[Any]]
+    def putIteratorAsValues[T](
+        blockId: BlockId,
+        iter: Iterator[T],
+        classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
+      memoryStore.putIteratorAsValues(blockId, iter, classTag)
+    }
+
+    // Unroll with plenty of space. This should succeed and cache both blocks.
+    assert(putIteratorAsValues("b1", nativeObjIterator, ClassTag.Any).isRight)
+    assert(putIteratorAsValues("b2", nativeObjIterator, ClassTag.Any).isRight)
+
+    memoryStore.clear
+    // Check if allocator was cleared.
+    while (allocator.getAllocatedMemory > 0) {
+      Thread.sleep(500)
+    }
+    assert(allocator.getAllocatedMemory == 0)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to