This is an automated email from the ASF dual-hosted git repository.

jmclean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new d32b0ddcf6 [#10598] fix(core): close cached Lance datasets before 
closing Arrow allocator (#10619)
d32b0ddcf6 is described below

commit d32b0ddcf697d4b24d4fefc47f7a6b75a8074c96
Author: pythaac <[email protected]>
AuthorDate: Thu Apr 2 13:34:20 2026 +0900

    [#10598] fix(core): close cached Lance datasets before closing Arrow 
allocator (#10619)
    
    ### What changes were proposed in this pull request?
    
    1. Modified `LancePartitionStatisticStorage.close()` to explicitly
    iterate over all cached `Dataset` instances and call `.close()` before
    calling `cache.invalidateAll()` and `allocator.close()`.
    2. Updated `testCloseReleasesCachedDatasetBeforeAllocator` to verify
    both the execution order (`InOrder`) and the actual release of native
    Arrow memory (`getAllocatedMemory() == 0`).
    
    ### Why are the changes needed?
    
    
    The original issue correctly states that closing the `allocator` before
    the `Dataset` causes a shutdown exception.
    
    However, simply swapping the closing order is not enough. If we rely
    solely on `invalidateAll()`, `dataset.close()` is never called, silently
    leaking native Arrow memory and still causing `allocator.close()` to
    crash (`IllegalStateException: Allocator closed with outstanding buffers
    allocated`).
    
    Therefore, explicitly closing the datasets in a loop is strictly
    required to safely release off-heap resources.
    
    Fix: #10598
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    
    ### How was this patch tested?
    - Executed `testCloseReleasesCachedDatasetBeforeAllocator` using Mockito
    `spy()` with a real `RootAllocator` and `VarCharVector`.
    - Verified that `Dataset.close()` strictly happens before
    `allocator.close()`.
    - Asserted that `allocator.getAllocatedMemory()` successfully drops to
    `0 bytes` after `storage.close()`.
---
 .../storage/LancePartitionStatisticStorage.java    | 18 ++++++-
 .../TestLancePartitionStatisticStorage.java        | 58 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
index 918c94218c..ea2d000cc5 100644
--- 
a/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
+++ 
b/core/src/main/java/org/apache/gravitino/stats/storage/LancePartitionStatisticStorage.java
@@ -76,6 +76,8 @@ import org.apache.gravitino.stats.PartitionStatisticsUpdate;
 import org.apache.gravitino.stats.StatisticValue;
 import org.apache.gravitino.utils.MetadataObjectUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** LancePartitionStatisticStorage is based on Lance format files. */
 public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage {
@@ -131,6 +133,8 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
 
   private final EntityStore entityStore = 
GravitinoEnv.getInstance().entityStore();
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(LancePartitionStatisticStorage.class);
+
   public LancePartitionStatisticStorage(Map<String, String> properties) {
     this.allocator = new RootAllocator();
     this.location = properties.getOrDefault(LOCATION, DEFAULT_LOCATION);
@@ -350,12 +354,22 @@ public class LancePartitionStatisticStorage implements 
PartitionStatisticStorage
 
   @Override
   public void close() throws IOException {
+    if (datasetCache.isPresent()) {
+      Cache<Long, Dataset> cache = datasetCache.get();
+      for (Dataset dataset : cache.asMap().values()) {
+        try {
+          dataset.close();
+        } catch (Exception e) {
+          LOG.warn("Failed to close cached Lance dataset", e);
+        }
+      }
+      cache.invalidateAll();
+    }
+
     if (allocator != null) {
       allocator.close();
     }
 
-    datasetCache.ifPresent(Cache::invalidateAll);
-
     if (scheduler != null) {
       scheduler.shutdown();
     }
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
index 7a6755870f..d1bee34f38 100644
--- 
a/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
+++ 
b/core/src/test/java/org/apache/gravitino/stats/storage/TestLancePartitionStatisticStorage.java
@@ -19,15 +19,23 @@
 package org.apache.gravitino.stats.storage;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import com.github.benmanes.caffeine.cache.Cache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.lancedb.lance.Dataset;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VarCharVector;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.EntityStore;
@@ -42,6 +50,7 @@ import org.apache.gravitino.stats.StatisticValue;
 import org.apache.gravitino.stats.StatisticValues;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
 
 public class TestLancePartitionStatisticStorage {
 
@@ -594,4 +603,53 @@ public class TestLancePartitionStatisticStorage {
       storage.close();
     }
   }
+
+  @Test
+  public void testCloseReleasesCachedDatasetBeforeAllocator() throws Exception 
{
+    String location = 
Files.createTempDirectory("lance_stats_close_cache").toString();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("location", location);
+    properties.put("datasetCacheSize", "10");
+
+    EntityStore entityStore = org.mockito.Mockito.mock(EntityStore.class);
+    TableEntity tableEntity = org.mockito.Mockito.mock(TableEntity.class);
+    when(entityStore.get(any(), any(), any())).thenReturn(tableEntity);
+    when(tableEntity.id()).thenReturn(1L);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", 
entityStore, true);
+
+    LancePartitionStatisticStorage storage = new 
LancePartitionStatisticStorage(properties);
+
+    try {
+      BufferAllocator allocator = spy(new RootAllocator(Long.MAX_VALUE));
+      FieldUtils.writeField(storage, "allocator", allocator, true);
+
+      Cache<Long, Dataset> datasetCache = storage.getDatasetCache();
+      Assertions.assertNotNull(datasetCache);
+
+      Dataset dataset = mock(Dataset.class);
+      VarCharVector buffer = new VarCharVector("test", allocator);
+      buffer.allocateNew(1024);
+
+      doAnswer(
+              invocation -> {
+                buffer.close();
+                return null;
+              })
+          .when(dataset)
+          .close();
+
+      datasetCache.put(1L, dataset);
+
+      storage.close();
+
+      Assertions.assertEquals(0, allocator.getAllocatedMemory());
+
+      InOrder inOrder = inOrder(dataset, allocator);
+      inOrder.verify(dataset).close();
+      inOrder.verify(allocator).close();
+
+    } finally {
+      FileUtils.deleteDirectory(new File(location));
+    }
+  }
 }

Reply via email to