[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21322 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r224874828 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,30 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): Unit = { +maybeReleaseResources(resource._1, resource._2) + } + + private def maybeReleaseResources(blockId: BlockId, entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values, blockId) + case _ => +} + } + + private def maybeCloseValues(values: Array[Any], blockId: BlockId): Unit = { +if (blockId.isBroadcast) { + values.foreach(value => Utils.tryClose(value)) --- End diff -- Just a style thing, but could be `values.foreach(Utils.tryClose)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r224875899 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1930,6 +1930,18 @@ private[spark] object Utils extends Logging { } } + def tryClose(value: Any): Unit = { --- End diff -- This should accept at best `AnyRef`. It doesn't really seem like we need a new global utility method for this. It's a little unusual to try closing things that aren't `Closeable` and we can try to rationalize that in the callers above if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r224875111 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,30 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): Unit = { +maybeReleaseResources(resource._1, resource._2) + } + + private def maybeReleaseResources(blockId: BlockId, entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() --- End diff -- Why not just make these case classes `Closeable` and then you can close them consistently --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user JeetKunDoug commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r189276054 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,14 +385,37 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values) + case _ => +} + } + + private def maybeCloseValues(values: Array[Any]): Unit = { +values.foreach { + case closable: AutoCloseable => +safelyCloseValue(closable) + case _ => +} + } + + private def safelyCloseValue(closable: AutoCloseable): Unit = { +try { + closable.close() +} catch { + case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex) +} + } + def remove(blockId: BlockId): Boolean = memoryManager.synchronized { val entry = entries.synchronized { entries.remove(blockId) } if (entry != null) { - entry match { -case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() -case _ => + if (blockId.isBroadcast) { +maybeReleaseResources(entry) --- End diff -- @dbtsai thanks - That whole "my day job vs. OSS" rush to fix. Will take care of it correctly and push an update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r189210100 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,14 +385,37 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values) + case _ => +} + } + + private def maybeCloseValues(values: Array[Any]): Unit = { +values.foreach { + case closable: AutoCloseable => +safelyCloseValue(closable) + case _ => +} + } + + private def safelyCloseValue(closable: AutoCloseable): Unit = { +try { + closable.close() +} catch { + case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex) +} + } + def remove(blockId: BlockId): Boolean = memoryManager.synchronized { --- End diff -- To do it in `remove`, I don't think we can avoid the issue I mentioned before. If you have a deserilized value in broadcast cache, it's possible to be cleaned by GC if this broadcasted value isn't stored as deserialized entry in `MemoryStore`. If the object already claims some resources we want to release by using `AutoCloseable` interface, we don't properly release it when it's cleaned by GC. That is happened before `remove` is called. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r189187759 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -404,6 +428,7 @@ private[spark] class MemoryStore( def clear(): Unit = memoryManager.synchronized { entries.synchronized { + entries.values().asScala.foreach(maybeReleaseResources) --- End diff -- Should we check if the keys are `blockId.isBroadcast == true`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r189187178 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,14 +385,37 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values) + case _ => +} + } + + private def maybeCloseValues(values: Array[Any]): Unit = { +values.foreach { + case closable: AutoCloseable => +safelyCloseValue(closable) + case _ => +} + } + + private def safelyCloseValue(closable: AutoCloseable): Unit = { +try { + closable.close() +} catch { + case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex) +} + } + def remove(blockId: BlockId): Boolean = memoryManager.synchronized { val entry = entries.synchronized { entries.remove(blockId) } if (entry != null) { - entry match { -case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() -case _ => + if (blockId.isBroadcast) { +maybeReleaseResources(entry) --- End diff -- In this case, what happen when the blockId is not broadcast? The existing cleaning-up will not be called. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user JeetKunDoug commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188325608 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) --- End diff -- Actually, digging further, there's other places where we may deserialize an object from the disk store and never put it into the memory store - it seems like punting on a guarantee that your AutoClosable object is closed and making this a best-effort thing when calling `BlockManager.removeBroadcast` (which is how we used it in the case that caused us to put together this PR) may make the most sense. It'll still be better than depending on GC and a finalizer to get the resources cleaned up when the driver can call `Broadcast#destroy` but we can document it as a best practice to have one just in case the close() call doesn't happen due to edge cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user JeetKunDoug commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188314886 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) --- End diff -- Ah- ok, I see where the issue is. So in this case you may have a deserialized instance but the memory store is full, so it fails to be put. Now we've got a live, deserialized object not in MemoryStore. Thanks for catching this. It looks like this case could be handled in `MemoryStore.putIteratorAsValues` when the `putIterator` call fails, which would handle several cases in `BlockManager` where we try (and fail) to put deserialized values, but it means a check for potential `AutoClosable` values any time we fail to put into `MemoryStore`, and I'm not sure of the performance impact of this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188306362 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) --- End diff -- In theory, you can have working broadcasted object and at the same time it is not in `MemoryStore`. During storing the merged object into `BlockManager` by calling `putSingle`, it can be stored into disk store. Once the object is going to be used, if we can't find it in cache, we call `BlockManager.getLocalValues` to retrieve it back from disk store. Although it will try to store it to `MemoryStore`, it may not success. I think the point is here the change assumes that if there is a deserialized broadcasted object, it is definitely in `MemoryStore`. But if I read the code correctly, it is not the case. You can have serialized bytes of the object in disk store and use a deserialized object at the same time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user JeetKunDoug commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188295537 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) --- End diff -- I wouldn't expect a never-deserialized Memory Entry to be closed, as it was never really instantiated to begin with - so if it _only_ lands on disk, I think that's reasonable (as the variable in question would never have had a chance to allocate anything either). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188128515 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) + case _ => +} + } + + private def maybeCloseValues(objs: Array[Any]): Unit = { +objs.foreach { +case closable: AutoCloseable => --- End diff -- indent style: two spaces. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188128177 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) --- End diff -- As I know, broadcasted variables can be serialized on disk too (`BlockManager.doPutIterator`). In the case, seems `AutoCloseable` broadcasted variables won't hit this release logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user JeetKunDoug commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188074445 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -526,4 +526,84 @@ class MemoryStoreSuite } } } + + test("[SPARK-24225]: remove should close AutoCloseable object") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker() +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker(true) +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.clear() +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") { --- End diff -- So if I understand the API correctly, there is no way to remove a single item that was put as part of a call to `putIterator` - because operations are conducted by `blockId` you would only be able to remove the whole group of entries, not a single part of an iterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188033806 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -526,4 +526,84 @@ class MemoryStoreSuite } } } + + test("[SPARK-24225]: remove should close AutoCloseable object") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker() +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker(true) +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.clear() +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") { + +val (store, _) = makeMemoryStore(12000) + +val id1 = BroadcastBlockId(1) +val tracker2 = new CloseTracker +val tracker1 = new CloseTracker +store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any) +assert(store.contains(id1)) +store.clear() +assert(tracker1.getClosed()) +assert(tracker2.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") { + +val (store, _) = makeMemoryStore(12000) + +val id1 = BroadcastBlockId(1) +val id2 = BroadcastBlockId(2) +val tracker2 = new CloseTracker(true) +val tracker1 = new CloseTracker(true) +store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any) +store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any) +assert(store.contains(id1)) +assert(store.contains(id2)) +store.clear() +assert(tracker1.getClosed()) +assert(tracker2.getClosed()) + } +} + +private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable { + var closed = false + override def close(): Unit = { +closed = true +if (throwsOnClosed) { + throw new RuntimeException("Throwing") --- End diff -- Could you add `var isExcpetionThrown = false`, and check it in the test whether the exception is thrown? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188034118 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -526,4 +526,84 @@ class MemoryStoreSuite } } } + + test("[SPARK-24225]: remove should close AutoCloseable object") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker() +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker(true) +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.clear() +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") { + +val (store, _) = makeMemoryStore(12000) + +val id1 = BroadcastBlockId(1) +val tracker2 = new CloseTracker +val tracker1 = new CloseTracker +store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any) +assert(store.contains(id1)) +store.clear() +assert(tracker1.getClosed()) +assert(tracker2.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") { + +val (store, _) = makeMemoryStore(12000) + +val id1 = BroadcastBlockId(1) +val id2 = BroadcastBlockId(2) +val tracker2 = new CloseTracker(true) +val tracker1 = new CloseTracker(true) +store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any) +store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any) +assert(store.contains(id1)) +assert(store.contains(id2)) +store.clear() +assert(tracker1.getClosed()) +assert(tracker2.getClosed()) + } +} + +private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable { + var closed = false + override def close(): Unit = { +closed = true +if (throwsOnClosed) { + throw new RuntimeException("Throwing") +} + } + def getClosed(): Boolean = { +closed --- End diff -- since `closed` is public, you might use it directly. Or you can make `closed` private. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188035443 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -526,4 +526,84 @@ class MemoryStoreSuite } } } + + test("[SPARK-24225]: remove should close AutoCloseable object") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker() +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker(true) +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.remove(id) +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects") { + +val (store, _) = makeMemoryStore(12000) + +val id = BroadcastBlockId(0) +val tracker = new CloseTracker +store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) +assert(store.contains(id)) +store.clear() +assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") { --- End diff -- Can you check if you have multiple autocloseable objects in an iterator, and only one of them is removed, the rests of the objects will not be closed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188032854 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) + case _ => +} + } + + private def maybeCloseValues(objs: Array[Any]): Unit = { --- End diff -- `values: Array[Any]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21322#discussion_r188032698 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { +entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) --- End diff -- `DeserializedMemoryEntry(values, _, _)` to match the rest of code style. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org