This is an automated email from the ASF dual-hosted git repository.
jmclean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d32b0ddcf6 [#10598] fix(core): close cached Lance datasets before
closing Arrow allocator (#10619)
d32b0ddcf6 is described below
commit d32b0ddcf697d4b24d4fefc47f7a6b75a8074c96
Author: pythaac <[email protected]>
AuthorDate: Thu Apr 2 13:34:20 2026 +0900
[#10598] fix(core): close cached Lance datasets before closing Arrow
allocator (#10619)
### What changes were proposed in this pull request?
1. Modified `LancePartitionStatisticStorage.close()` to explicitly
iterate over all cached `Dataset` instances and call `.close()` before
calling `cache.invalidateAll()` and `allocator.close()`.
2. Updated `testCloseReleasesCachedDatasetBeforeAllocator` to verify
both the execution order (`InOrder`) and the actual release of native
Arrow memory (`getAllocatedMemory() == 0`).
### Why are the changes needed?
The original issue correctly states that closing the `allocator` before
the `Dataset` causes a shutdown exception.
However, simply swapping the closing order is not enough. If we rely
solely on `invalidateAll()`, `dataset.close()` is never called, silently
leaking native Arrow memory and still causing `allocator.close()` to
crash (`IllegalStateException: Allocator closed with outstanding buffers
allocated`).
Therefore, explicitly closing the datasets in a loop is strictly
required to safely release off-heap resources.
Fix: #10598
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Executed `testCloseReleasesCachedDatasetBeforeAllocator` using Mockito
`spy()` with a real `RootAllocator` and `VarCharVector`.
- Verified that `Dataset.close()` strictly happens before
`allocator.close()`.
- Asserted that `allocator.getAllocatedMemory()` successfully drops to
`0 bytes` after `storage.close()`.
---
.../storage/LancePartitionStatisticStorage.java | 18 ++++++-
.../TestLancePartitionStatisticStorage.java | 58 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
index 918c94218c..ea2d000cc5 100644
---
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
+++
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
@@ -76,6 +76,8 @@ import org.apache.gravitino.stats.PartitionStatisticsUpdate;
import org.apache.gravitino.stats.StatisticValue;
import org.apache.gravitino.utils.MetadataObjectUtil;
import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** LancePartitionStatisticStorage is based on Lance format files. */
public class LancePartitionStatisticStorage implements
PartitionStatisticStorage {
@@ -131,6 +133,8 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
private final EntityStore entityStore =
GravitinoEnv.getInstance().entityStore();
+ private static final Logger LOG =
LoggerFactory.getLogger(LancePartitionStatisticStorage.class);
+
public LancePartitionStatisticStorage(Map<String, String> properties) {
this.allocator = new RootAllocator();
this.location = properties.getOrDefault(LOCATION, DEFAULT_LOCATION);
@@ -350,12 +354,22 @@ public class LancePartitionStatisticStorage implements
PartitionStatisticStorage
@Override
public void close() throws IOException {
+ if (datasetCache.isPresent()) {
+ Cache<Long, Dataset> cache = datasetCache.get();
+ for (Dataset dataset : cache.asMap().values()) {
+ try {
+ dataset.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close cached Lance dataset", e);
+ }
+ }
+ cache.invalidateAll();
+ }
+
if (allocator != null) {
allocator.close();
}
- datasetCache.ifPresent(Cache::invalidateAll);
-
if (scheduler != null) {
scheduler.shutdown();
}
diff --git
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
index 7a6755870f..d1bee34f38 100644
---
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
+++
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -19,15 +19,23 @@
package org.apache.gravitino.stats.storage;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.lancedb.lance.Dataset;
import java.io.File;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.EntityStore;
@@ -42,6 +50,7 @@ import org.apache.gravitino.stats.StatisticValue;
import org.apache.gravitino.stats.StatisticValues;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
public class TestLancePartitionStatisticStorage {
@@ -594,4 +603,53 @@ public class TestLancePartitionStatisticStorage {
storage.close();
}
}
+
+ @Test
+ public void testCloseReleasesCachedDatasetBeforeAllocator() throws Exception
{
+ String location =
Files.createTempDirectory("lance_stats_close_cache").toString();
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("location", location);
+ properties.put("datasetCacheSize", "10");
+
+ EntityStore entityStore = org.mockito.Mockito.mock(EntityStore.class);
+ TableEntity tableEntity = org.mockito.Mockito.mock(TableEntity.class);
+ when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+ when(tableEntity.id()).thenReturn(1L);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore",
entityStore, true);
+
+ LancePartitionStatisticStorage storage = new
LancePartitionStatisticStorage(properties);
+
+ try {
+ BufferAllocator allocator = spy(new RootAllocator(Long.MAX_VALUE));
+ FieldUtils.writeField(storage, "allocator", allocator, true);
+
+ Cache<Long, Dataset> datasetCache = storage.getDatasetCache();
+ Assertions.assertNotNull(datasetCache);
+
+ Dataset dataset = mock(Dataset.class);
+ VarCharVector buffer = new VarCharVector("test", allocator);
+ buffer.allocateNew(1024);
+
+ doAnswer(
+ invocation -> {
+ buffer.close();
+ return null;
+ })
+ .when(dataset)
+ .close();
+
+ datasetCache.put(1L, dataset);
+
+ storage.close();
+
+ Assertions.assertEquals(0, allocator.getAllocatedMemory());
+
+ InOrder inOrder = inOrder(dataset, allocator);
+ inOrder.verify(dataset).close();
+ inOrder.verify(allocator).close();
+
+ } finally {
+ FileUtils.deleteDirectory(new File(location));
+ }
+ }
}