This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 137ea80bc34 HDDS-14800. Guard RocksDB iterator against closed DB 
during volume failure (#9904)
137ea80bc34 is described below

commit 137ea80bc3459c0609e73c7c1a633505464fa028
Author: Priyesh Karatha <[email protected]>
AuthorDate: Fri Mar 20 15:07:12 2026 +0530

    HDDS-14800. Guard RocksDB iterator against closed DB during volume failure 
(#9904)
---
 .../ozone/container/metadata/AbstractRDBStore.java |  10 +-
 .../TestBackgroundContainerDataScanner.java        | 114 ++++++++
 .../hdds/utils/db/RDBStoreAbstractIterator.java    |  19 +-
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |   4 +
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |  23 +-
 .../utils/db/TestRDBStoreIteratorWithDBClose.java  | 308 +++++++++++++++++++++
 .../utils/db/managed/ManagedRocksIterator.java     |  31 ++-
 7 files changed, 494 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
index 00f23f6958a..d4a16ff2a2b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
@@ -118,9 +118,9 @@ protected abstract DBStore initDBStore(DBStoreBuilder 
dbStoreBuilder, ManagedDBO
 
   @Override
   public synchronized void stop() {
-    if (store != null) {
-      store.close();
-      store = null;
+    if (this.store != null) {
+      this.store.close();
+      this.store = null;
     }
   }
 
@@ -143,8 +143,8 @@ public BatchOperationHandler getBatchHandler() {
   }
 
   @Override
-  public void close() {
-    this.store.close();
+  public synchronized void close() {
+    stop();
     this.cfOptions.close();
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
index 508c472a7c8..53598242254 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
@@ -18,8 +18,10 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyDataScanResult;
 import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyMetadataScanResult;
 import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -38,12 +40,19 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -51,9 +60,12 @@
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import 
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 import org.mockito.MockedStatic;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
@@ -272,6 +284,108 @@ public void testShutdownDuringScan() throws Exception {
     verifyContainerMarkedUnhealthy(healthy, never());
   }
 
+  /**
+   * Scanner shuts down cleanly when volume failure is detected mid-iteration.
+   */
+  @Test
+  public void testVolumeFailureDuringIterationShutdownsScanner()
+      throws Exception {
+    // isFailed() is called twice per container (scanContainer + 
shouldScanMetadata).
+    // Return false for both checks of the first container, then true after 
that.
+    when(vol.isFailed()).thenReturn(false, false, true);
+
+    CountDownLatch firstContainerScanned = new CountDownLatch(1);
+    when(healthy.scanData(any(DataTransferThrottler.class), 
any(Canceler.class)))
+        .then(i -> {
+          firstContainerScanned.countDown();
+          return getHealthyDataScanResult();
+        });
+
+    ContainerDataScannerMetrics metrics = scanner.getMetrics();
+    scanner.start();
+
+    assertTrue(firstContainerScanned.await(5, TimeUnit.SECONDS),
+        "First container should have been scanned");
+
+    long deadline = System.currentTimeMillis() + 5000;
+    while (scanner.isAlive() && System.currentTimeMillis() < deadline) {
+      Thread.sleep(100);
+    }
+
+    assertFalse(verify(vol, atLeastOnce()).isFailed());
+    assertFalse(scanner.isAlive(),
+        "Scanner thread should have terminated after detecting volume 
failure");
+    assertEquals(0, metrics.getNumScanIterations(),
+        "No full iteration should have completed after volume failure");
+    verify(corruptData, never()).scanData(any(), any());
+  }
+
+  /**
+   * Scan completes without exception when the underlying DB is closed
+   * concurrently (simulates StorageVolumeChecker calling failVolume() while
+   * BackgroundContainerDataScanner holds an open iterator).
+   */
+  @Test
+  public void testScanExitsCleanlyWhenDBClosedDuringIteration(
+      @TempDir File tempDir) throws Exception {
+    File dbDir = new File(tempDir, "container-db");
+
+    // DatanodeTable disables iterator(), so get the raw RDBTable via the
+    // column family definition for iteration.
+    try (DatanodeStoreSchemaThreeImpl datanodeStore =
+        new DatanodeStoreSchemaThreeImpl(
+            new OzoneConfiguration(), dbDir.getAbsolutePath(), false)) {
+      Table<String, Long> metaTableForPut = datanodeStore.getMetadataTable();
+      for (int i = 0; i < 50; i++) {
+        metaTableForPut.put("key-" + i, (long) i);
+      }
+      Table<String, Long> iterableMetaTable =
+          
DatanodeSchemaThreeDBDefinition.METADATA.getTable(datanodeStore.getStore());
+
+      CountDownLatch iteratorOpen = new CountDownLatch(1);
+      CountDownLatch resumeIteration = new CountDownLatch(1);
+
+      when(healthy.scanData(
+          any(DataTransferThrottler.class), any(Canceler.class)))
+          .then(invocation -> {
+            try (Table.KeyValueIterator<String, Long> iter =
+                iterableMetaTable.iterator()) {
+              iteratorOpen.countDown();
+              assertTrue(resumeIteration.await(5, TimeUnit.SECONDS),
+                  "resumeIteration latch should have been released");
+              while (iter.hasNext()) {
+                iter.next();
+              }
+            }
+            return getHealthyDataScanResult();
+          });
+
+      ExecutorService scanExecutor = Executors.newSingleThreadExecutor();
+      Future<?> scanFuture = scanExecutor.submit(() -> scanner.runIteration());
+
+      assertTrue(iteratorOpen.await(5, TimeUnit.SECONDS),
+          "Iterator should have been opened inside scanData()");
+
+      // Simulate failVolume() on a separate thread
+      ExecutorService closeExecutor = Executors.newSingleThreadExecutor();
+      Future<?> closeFuture = closeExecutor.submit((Callable<Void>) () -> {
+        datanodeStore.stop();
+        return null;
+      });
+      Thread.sleep(50);
+
+      resumeIteration.countDown();
+
+      assertDoesNotThrow(() -> scanFuture.get(5, TimeUnit.SECONDS),
+          "Scan must complete without exception when DB is closed 
concurrently");
+      assertDoesNotThrow(() -> closeFuture.get(5, TimeUnit.SECONDS),
+          "DB close must complete after iterator is released");
+
+      scanExecutor.shutdown();
+      closeExecutor.shutdown();
+    }
+  }
+
   @Test
   public void testMerkleTreeWritten() throws Exception {
     scanner.runIteration();
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
index c6ca7247fdc..9ebaf368e9f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.utils.db;
 
 import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
 import org.slf4j.Logger;
@@ -40,9 +41,14 @@ abstract class RDBStoreAbstractIterator<RAW>
   // This is for schemas that use a fixed-length
   // prefix for each key.
   private final RAW prefix;
-
   private final IteratorType type;
+  private final AtomicBoolean isIteratorClosed = new AtomicBoolean(false);
 
+  /**
+   * Constructor for RDBStoreAbstractIterator.
+   * Callers must ensure that the iterator is always obtained using 
try-with-resources
+   * or always closed in a finally block to ensure accurate refcounting.
+   */
   RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW 
prefix, IteratorType type) {
     this.rocksDBIterator = iterator;
     this.rocksDBTable = table;
@@ -89,6 +95,10 @@ public final void forEachRemaining(
     }
   }
 
+  private boolean isDbClosed() {
+    return rocksDBTable != null && rocksDBTable.isClosed();
+  }
+
   private void setCurrentEntry() {
     if (rocksDBIterator.get().isValid()) {
       currentEntry = getKeyValue();
@@ -99,6 +109,9 @@ private void setCurrentEntry() {
 
   @Override
   public final boolean hasNext() {
+    if (isDbClosed()) {
+      return false;
+    }
     return rocksDBIterator.get().isValid() &&
         (prefix == null || startsWithPrefix(key()));
   }
@@ -154,6 +167,8 @@ public final void removeFromDB() throws 
RocksDatabaseException, CodecException {
 
   @Override
   public void close() {
-    rocksDBIterator.close();
+    if (isIteratorClosed.compareAndSet(false, true)) {
+      rocksDBIterator.close();
+    }
   }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 2aef5daa3c9..7d8d2f5af57 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -224,6 +224,10 @@ KeyValueIterator<CodecBuffer, CodecBuffer> iterator(
         this, prefix, type);
   }
 
+  boolean isClosed() {
+    return db.isClosed();
+  }
+
   @Override
   public String getName() {
     return family.getName();
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 5aff9351804..f344ad95e55 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -396,7 +396,9 @@ private void close(boolean isSync) {
   }
 
   private void waitAndClose() {
-    // wait till all access to rocks db is process to avoid crash while close
+    // Wait until all active operations (including open iterators) complete.
+    // Iterators acquired after DB close is triggered will fast-fail in
+    // hasNext(), so this loop is expected to complete quickly in practice.
     while (!counter.compareAndSet(0, Long.MIN_VALUE)) {
       try {
         Thread.currentThread().sleep(1);
@@ -427,7 +429,7 @@ private boolean shouldClose(RocksDBException e) {
     }
   }
 
-  private UncheckedAutoCloseable acquire() throws RocksDatabaseException {
+  UncheckedAutoCloseable acquire() throws RocksDatabaseException {
     if (isClosed()) {
       throw new RocksDatabaseException("Rocks Database is closed");
     }
@@ -770,17 +772,24 @@ public long getLatestSequenceNumber() throws 
RocksDatabaseException {
 
   public ManagedRocksIterator newIterator(ColumnFamily family)
       throws RocksDatabaseException {
-    try (UncheckedAutoCloseable ignored = acquire()) {
-      return managed(db.get().newIterator(family.getHandle()));
+    final UncheckedAutoCloseable ref = acquire();
+    try {
+      return managed(db.get().newIterator(family.getHandle()), ref);
+    } catch (RuntimeException e) {
+      ref.close();
+      throw e;
     }
   }
 
   public ManagedRocksIterator newIterator(ColumnFamily family,
       boolean fillCache) throws RocksDatabaseException {
-    try (UncheckedAutoCloseable ignored = acquire();
-         ManagedReadOptions readOptions = new ManagedReadOptions()) {
+    final UncheckedAutoCloseable ref = acquire();
+    try (ManagedReadOptions readOptions = new ManagedReadOptions()) {
       readOptions.setFillCache(fillCache);
-      return managed(db.get().newIterator(family.getHandle(), readOptions));
+      return managed(db.get().newIterator(family.getHandle(), readOptions), 
ref);
+    } catch (RuntimeException e) {
+      ref.close();
+      throw e;
     }
   }
 
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIteratorWithDBClose.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIteratorWithDBClose.java
new file mode 100644
index 00000000000..b0484fba393
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIteratorWithDBClose.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.apache.hadoop.hdds.utils.db.IteratorType.KEY_AND_VALUE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.RocksDB;
+
+/**
+ * Tests that RDBStoreAbstractIterator handles concurrent DB close safely.
+ */
+public class TestRDBStoreIteratorWithDBClose {
+
+  private static final String TABLE_NAME = "TestTable";
+  private static final int ENTRY_COUNT = 100;
+
+  private RDBStore rdbStore;
+  private ManagedDBOptions options;
+
+  @BeforeEach
+  public void setUp(@TempDir File tempDir) throws Exception {
+    options = TestRDBStore.newManagedDBOptions();
+    Set<TableConfig> configSet = new HashSet<>();
+    // RocksDB always requires the default column family to be present
+    configSet.add(new TableConfig(
+        StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+        new ManagedColumnFamilyOptions()));
+    configSet.add(new TableConfig(TABLE_NAME, new 
ManagedColumnFamilyOptions()));
+    rdbStore = TestRDBStore.newRDBStore(tempDir, options, configSet);
+
+    RDBTable table = rdbStore.getTable(TABLE_NAME);
+    for (int i = 0; i < ENTRY_COUNT; i++) {
+      byte[] key = ("key-" + i).getBytes(StandardCharsets.UTF_8);
+      byte[] value = ("value-" + i).getBytes(StandardCharsets.UTF_8);
+      table.put(key, value);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (rdbStore != null && !rdbStore.isClosed()) {
+      rdbStore.close();
+    }
+    if (options != null) {
+      options.close();
+    }
+  }
+
+  /**
+   * Validates the fast-fail check (isClosed guard).
+   */
+  @Test
+  public void testHasNextReturnsFalseAfterDBClosed() throws Exception {
+    RDBTable table = rdbStore.getTable(TABLE_NAME);
+
+    int threadCount = 10;
+    // Each thread iterates for up to 60 s; with 100 ms per entry × 100 entries
+    // one pass takes ~10 s, so threads are guaranteed mid-scan at the 5 s 
close.
+    long iterationCycleMs = 60_000;
+    long closeAfterMs = 5_000;
+    int sleepPerEntryMs = 100;
+
+    CountDownLatch allStarted = new CountDownLatch(threadCount);
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    List<Future<Void>> futures = new ArrayList<>();
+
+    for (int t = 0; t < threadCount; t++) {
+      futures.add(executor.submit((Callable<Void>) () -> {
+        try (Table.KeyValueIterator<byte[], byte[]> iter =
+            table.iterator((byte[]) null, KEY_AND_VALUE)) {
+          allStarted.countDown();
+          long deadline = System.currentTimeMillis() + iterationCycleMs;
+          while (System.currentTimeMillis() < deadline) {
+            while (iter.hasNext()) {
+              iter.next();
+              Thread.sleep(sleepPerEntryMs);
+            }
+            // hasNext() returned false: either DB was closed or natural table 
end.
+            // isClosed() is package-private on RDBTable, accessible from this 
package.
+            if (table.isClosed()) {
+              break; // DB closed — exit cleanly
+            }
+            // Natural end of table — seek back for another pass
+            iter.seekToFirst();
+          }
+        }
+        return null;
+      }));
+    }
+
+    assertTrue(allStarted.await(10, TimeUnit.SECONDS),
+        "All scanner threads should start within 10 seconds");
+
+    // All threads are mid-scan; close the DB now
+    Thread.sleep(closeAfterMs);
+    rdbStore.close();
+
+    // Every thread must detect the close via hasNext() → false and finish 
cleanly
+    for (Future<Void> future : futures) {
+      assertDoesNotThrow(() -> future.get(15, TimeUnit.SECONDS),
+          "Each scanner thread should complete cleanly after DB close");
+    }
+
+    executor.shutdown();
+  }
+
+  /**
+   * Validates the acquire/release mechanism.
+   */
+  @Test
+  public void testDBPhysicalCloseWaitsForIterator() throws Exception {
+    RDBTable table = rdbStore.getTable(TABLE_NAME);
+    Table.KeyValueIterator<byte[], byte[]> iter =
+        table.iterator((byte[]) null, KEY_AND_VALUE);
+
+    AtomicBoolean dbCloseCompleted = new AtomicBoolean(false);
+    CountDownLatch closeStarted = new CountDownLatch(1);
+
+    // Background thread simulates failVolume() → closeDbStore()
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<?> closeFuture = executor.submit(() -> {
+      closeStarted.countDown();
+      rdbStore.close();          // blocks in waitAndClose() until counter == 0
+      dbCloseCompleted.set(true);
+    });
+
+    // Wait for close thread to start and reach waitAndClose()
+    assertTrue(closeStarted.await(5, TimeUnit.SECONDS));
+    Thread.sleep(50);
+
+    // DB must NOT be physically closed yet — iterator holds dbRef (counter > 
0)
+    assertFalse(dbCloseCompleted.get(),
+        "DB should not be physically closed while iterator is still open");
+
+    // Release the iterator — decrements RocksDatabase.counter to 0
+    iter.close();
+
+    // Now waitAndClose() can proceed — DB physically closes
+    closeFuture.get(5, TimeUnit.SECONDS);
+    assertTrue(dbCloseCompleted.get(),
+        "DB should be physically closed after iterator is released");
+
+    executor.shutdown();
+  }
+
+  /**
+   * Validates the end-to-end race scenario.
+   *
+   * BackgroundContainerDataScanner iterates while StorageVolumeChecker
+   * concurrently triggers failVolume(). The scan must exit cleanly without
+   * any exception or native crash — hasNext() returns false once the DB is
+   * closed, and the iterator releases its dbRef allowing the DB to close.
+   */
+  @Test
+  public void testConcurrentDBCloseAndScanExitsCleanly() throws Exception {
+    RDBTable table = rdbStore.getTable(TABLE_NAME);
+
+    CountDownLatch scanStarted = new CountDownLatch(1);
+    AtomicBoolean scanCompleted = new AtomicBoolean(false);
+
+    // Scanner thread — simulates BackgroundContainerDataScanner
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<?> scanFuture = executor.submit((Callable<Void>) () -> {
+      try (Table.KeyValueIterator<byte[], byte[]> iter =
+          table.iterator((byte[]) null, KEY_AND_VALUE)) {
+        scanStarted.countDown();
+        while (iter.hasNext()) {
+          iter.next();
+          Thread.sleep(1); // slow scan to maximise chance of DB close racing
+        }
+      }
+      scanCompleted.set(true);
+      return null;
+    });
+
+    // Wait for scan to start, then trigger failVolume() concurrently
+    assertTrue(scanStarted.await(5, TimeUnit.SECONDS));
+    rdbStore.close(); // simulates failVolume() → closeDbStore()
+
+    assertDoesNotThrow(() -> scanFuture.get(10, TimeUnit.SECONDS),
+        "Scan should exit cleanly without throwing when DB is closed 
concurrently");
+    assertTrue(scanCompleted.get(),
+        "Scan loop should complete (via hasNext() returning false), not hang");
+
+    executor.shutdown();
+  }
+
+  /**
+   * Verifies that forEachRemaining() exits cleanly without errors when the 
database is closed
+   * concurrently during iteration.
+   */
+  @Test
+  public void testForEachRemainingExitsCleanlyOnConcurrentDBClose() throws 
Exception {
+    RDBTable table = rdbStore.getTable(TABLE_NAME);
+
+    int threadCount = 5;
+    CountDownLatch allStarted = new CountDownLatch(threadCount);
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    List<Future<Integer>> futures = new ArrayList<>();
+
+    for (int t = 0; t < threadCount; t++) {
+      futures.add(executor.submit(() -> {
+        AtomicInteger seen = new AtomicInteger(0);
+        try (Table.KeyValueIterator<byte[], byte[]> iter =
+            table.iterator((byte[]) null, KEY_AND_VALUE)) {
+          allStarted.countDown();
+          // forEachRemaining() is the exact "while (hasNext()) { next(); }" 
loop
+          // that the race condition can affect.
+          iter.forEachRemaining(kv -> {
+            seen.incrementAndGet();
+            try {
+              Thread.sleep(1); // slow scan to maximise race window
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          });
+        }
+        return seen.get();
+      }));
+    }
+
+    assertTrue(allStarted.await(10, TimeUnit.SECONDS),
+        "All scanner threads should start within 10 seconds");
+
+    // Close the DB while threads are mid-scan; this is the race trigger.
+    rdbStore.close();
+
+    for (Future<Integer> future : futures) {
+      assertDoesNotThrow(() -> {
+        int count = future.get(10, TimeUnit.SECONDS);
+        assertTrue(count >= 0 && count <= ENTRY_COUNT,
+            "Thread should have observed between 0 and " + ENTRY_COUNT + " 
entries");
+      }, "forEachRemaining must exit cleanly without NoSuchElementException");
+    }
+
+    executor.shutdown();
+  }
+
+  /**
+   * Validates that closing an iterator after DB close does not throw.
+   */
+  @Test
+  public void testIteratorCloseAfterDBCloseDoesNotThrow() throws Exception {
+    RDBTable table = rdbStore.getTable(TABLE_NAME);
+    Table.KeyValueIterator<byte[], byte[]> iter =
+        table.iterator((byte[]) null, KEY_AND_VALUE);
+
+    CountDownLatch closeStarted = new CountDownLatch(1);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    // Simulate failVolume() from StorageVolumeChecker's thread
+    Future<?> closeFuture = executor.submit(() -> {
+      closeStarted.countDown();
+      rdbStore.close(); // blocks until iter.close() releases dbRef
+    });
+
+    // Wait for close thread to mark isClosed = true
+    assertTrue(closeStarted.await(5, TimeUnit.SECONDS));
+    Thread.sleep(50);
+
+    // iter.close() is called after DB is marked closed — must not throw
+    assertDoesNotThrow(iter::close,
+        "Iterator close() after DB close must not throw");
+
+    // iter.close() decremented counter to 0 — close thread can now finish
+    closeFuture.get(5, TimeUnit.SECONDS);
+    executor.shutdown();
+  }
+}
diff --git 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
index 512df4a8e73..20cf84ce542 100644
--- 
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
+++ 
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
@@ -17,18 +17,47 @@
 
 package org.apache.hadoop.hdds.utils.db.managed;
 
+import org.apache.ratis.util.UncheckedAutoCloseable;
 import org.rocksdb.RocksIterator;
 
 /**
  * Managed RocksIterator.
+ *
+ * <p>When constructed with a {@code dbRef} (an acquired reference to the
+ * underlying RocksDatabase counter), the iterator holds that reference for its
+ * entire lifetime and releases it in {@link #close()}. This guarantees the DB
+ * cannot be physically destroyed (via waitAndClose) while the iterator is 
open,
+ * eliminating any TOCTOU race between iterator creation and use.
  */
 public class ManagedRocksIterator extends ManagedObject<RocksIterator> {
 
-  public ManagedRocksIterator(RocksIterator original) {
+  private final UncheckedAutoCloseable dbRef;
+
+  public ManagedRocksIterator(RocksIterator original, UncheckedAutoCloseable 
dbRef) {
     super(original);
+    this.dbRef = dbRef;
+  }
+
+  public ManagedRocksIterator(RocksIterator original) {
+    this(original, null);
+  }
+
+  @Override
+  public void close() {
+    try {
+      super.close();
+    } finally {
+      if (dbRef != null) {
+        dbRef.close();
+      }
+    }
   }
 
   public static ManagedRocksIterator managed(RocksIterator iterator) {
     return new ManagedRocksIterator(iterator);
   }
+
+  public static ManagedRocksIterator managed(RocksIterator iterator, 
UncheckedAutoCloseable dbRef) {
+    return new ManagedRocksIterator(iterator, dbRef);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to