This is an automated email from the ASF dual-hosted git repository.
sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 137ea80bc34 HDDS-14800. Guard RocksDB iterator against closed DB
during volume failure (#9904)
137ea80bc34 is described below
commit 137ea80bc3459c0609e73c7c1a633505464fa028
Author: Priyesh Karatha <[email protected]>
AuthorDate: Fri Mar 20 15:07:12 2026 +0530
HDDS-14800. Guard RocksDB iterator against closed DB during volume failure
(#9904)
---
.../ozone/container/metadata/AbstractRDBStore.java | 10 +-
.../TestBackgroundContainerDataScanner.java | 114 ++++++++
.../hdds/utils/db/RDBStoreAbstractIterator.java | 19 +-
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 4 +
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 23 +-
.../utils/db/TestRDBStoreIteratorWithDBClose.java | 308 +++++++++++++++++++++
.../utils/db/managed/ManagedRocksIterator.java | 31 ++-
7 files changed, 494 insertions(+), 15 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
index 00f23f6958a..d4a16ff2a2b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractRDBStore.java
@@ -118,9 +118,9 @@ protected abstract DBStore initDBStore(DBStoreBuilder
dbStoreBuilder, ManagedDBO
@Override
public synchronized void stop() {
- if (store != null) {
- store.close();
- store = null;
+ if (this.store != null) {
+ this.store.close();
+ this.store = null;
}
}
@@ -143,8 +143,8 @@ public BatchOperationHandler getBatchHandler() {
}
@Override
- public void close() {
- this.store.close();
+ public synchronized void close() {
+ stop();
this.cfOptions.close();
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
index 508c472a7c8..53598242254 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestBackgroundContainerDataScanner.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.ozone.container.ozoneimpl;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyDataScanResult;
import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyMetadataScanResult;
import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -38,12 +40,19 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -51,9 +60,12 @@
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ScanResult;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import
org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@@ -272,6 +284,108 @@ public void testShutdownDuringScan() throws Exception {
verifyContainerMarkedUnhealthy(healthy, never());
}
+ /**
+ * Scanner shuts down cleanly when volume failure is detected mid-iteration.
+ */
+ @Test
+ public void testVolumeFailureDuringIterationShutdownsScanner()
+ throws Exception {
+ // isFailed() is called twice per container (scanContainer +
shouldScanMetadata).
+ // Return false for both checks of the first container, then true after
that.
+ when(vol.isFailed()).thenReturn(false, false, true);
+
+ CountDownLatch firstContainerScanned = new CountDownLatch(1);
+ when(healthy.scanData(any(DataTransferThrottler.class),
any(Canceler.class)))
+ .then(i -> {
+ firstContainerScanned.countDown();
+ return getHealthyDataScanResult();
+ });
+
+ ContainerDataScannerMetrics metrics = scanner.getMetrics();
+ scanner.start();
+
+ assertTrue(firstContainerScanned.await(5, TimeUnit.SECONDS),
+ "First container should have been scanned");
+
+ long deadline = System.currentTimeMillis() + 5000;
+ while (scanner.isAlive() && System.currentTimeMillis() < deadline) {
+ Thread.sleep(100);
+ }
+
+ assertFalse(verify(vol, atLeastOnce()).isFailed());
+ assertFalse(scanner.isAlive(),
+ "Scanner thread should have terminated after detecting volume
failure");
+ assertEquals(0, metrics.getNumScanIterations(),
+ "No full iteration should have completed after volume failure");
+ verify(corruptData, never()).scanData(any(), any());
+ }
+
+ /**
+ * Scan completes without exception when the underlying DB is closed
+ * concurrently (simulates StorageVolumeChecker calling failVolume() while
+ * BackgroundContainerDataScanner holds an open iterator).
+ */
+ @Test
+ public void testScanExitsCleanlyWhenDBClosedDuringIteration(
+ @TempDir File tempDir) throws Exception {
+ File dbDir = new File(tempDir, "container-db");
+
+ // DatanodeTable disables iterator(), so get the raw RDBTable via the
+ // column family definition for iteration.
+ try (DatanodeStoreSchemaThreeImpl datanodeStore =
+ new DatanodeStoreSchemaThreeImpl(
+ new OzoneConfiguration(), dbDir.getAbsolutePath(), false)) {
+ Table<String, Long> metaTableForPut = datanodeStore.getMetadataTable();
+ for (int i = 0; i < 50; i++) {
+ metaTableForPut.put("key-" + i, (long) i);
+ }
+ Table<String, Long> iterableMetaTable =
+
DatanodeSchemaThreeDBDefinition.METADATA.getTable(datanodeStore.getStore());
+
+ CountDownLatch iteratorOpen = new CountDownLatch(1);
+ CountDownLatch resumeIteration = new CountDownLatch(1);
+
+ when(healthy.scanData(
+ any(DataTransferThrottler.class), any(Canceler.class)))
+ .then(invocation -> {
+ try (Table.KeyValueIterator<String, Long> iter =
+ iterableMetaTable.iterator()) {
+ iteratorOpen.countDown();
+ assertTrue(resumeIteration.await(5, TimeUnit.SECONDS),
+ "resumeIteration latch should have been released");
+ while (iter.hasNext()) {
+ iter.next();
+ }
+ }
+ return getHealthyDataScanResult();
+ });
+
+ ExecutorService scanExecutor = Executors.newSingleThreadExecutor();
+ Future<?> scanFuture = scanExecutor.submit(() -> scanner.runIteration());
+
+ assertTrue(iteratorOpen.await(5, TimeUnit.SECONDS),
+ "Iterator should have been opened inside scanData()");
+
+ // Simulate failVolume() on a separate thread
+ ExecutorService closeExecutor = Executors.newSingleThreadExecutor();
+ Future<?> closeFuture = closeExecutor.submit((Callable<Void>) () -> {
+ datanodeStore.stop();
+ return null;
+ });
+ Thread.sleep(50);
+
+ resumeIteration.countDown();
+
+ assertDoesNotThrow(() -> scanFuture.get(5, TimeUnit.SECONDS),
+ "Scan must complete without exception when DB is closed
concurrently");
+ assertDoesNotThrow(() -> closeFuture.get(5, TimeUnit.SECONDS),
+ "DB close must complete after iterator is released");
+
+ scanExecutor.shutdown();
+ closeExecutor.shutdown();
+ }
+ }
+
@Test
public void testMerkleTreeWritten() throws Exception {
scanner.runIteration();
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
index c6ca7247fdc..9ebaf368e9f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.utils.db;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.slf4j.Logger;
@@ -40,9 +41,14 @@ abstract class RDBStoreAbstractIterator<RAW>
// This is for schemas that use a fixed-length
// prefix for each key.
private final RAW prefix;
-
private final IteratorType type;
+ private final AtomicBoolean isIteratorClosed = new AtomicBoolean(false);
+ /**
+ * Constructor for RDBStoreAbstractIterator.
+ * Callers must ensure that the iterator is always obtained using
try-with-resources
+ * or always closed in a finally block to ensure accurate refcounting.
+ */
RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW
prefix, IteratorType type) {
this.rocksDBIterator = iterator;
this.rocksDBTable = table;
@@ -89,6 +95,10 @@ public final void forEachRemaining(
}
}
+ private boolean isDbClosed() {
+ return rocksDBTable != null && rocksDBTable.isClosed();
+ }
+
private void setCurrentEntry() {
if (rocksDBIterator.get().isValid()) {
currentEntry = getKeyValue();
@@ -99,6 +109,9 @@ private void setCurrentEntry() {
@Override
public final boolean hasNext() {
+ if (isDbClosed()) {
+ return false;
+ }
return rocksDBIterator.get().isValid() &&
(prefix == null || startsWithPrefix(key()));
}
@@ -154,6 +167,8 @@ public final void removeFromDB() throws
RocksDatabaseException, CodecException {
@Override
public void close() {
- rocksDBIterator.close();
+ if (isIteratorClosed.compareAndSet(false, true)) {
+ rocksDBIterator.close();
+ }
}
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 2aef5daa3c9..7d8d2f5af57 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -224,6 +224,10 @@ KeyValueIterator<CodecBuffer, CodecBuffer> iterator(
this, prefix, type);
}
+ boolean isClosed() {
+ return db.isClosed();
+ }
+
@Override
public String getName() {
return family.getName();
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index 5aff9351804..f344ad95e55 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -396,7 +396,9 @@ private void close(boolean isSync) {
}
private void waitAndClose() {
- // wait till all access to rocks db is process to avoid crash while close
+ // Wait until all active operations (including open iterators) complete.
+ // Iterators acquired after DB close is triggered will fast-fail in
+ // hasNext(), so this loop is expected to complete quickly in practice.
while (!counter.compareAndSet(0, Long.MIN_VALUE)) {
try {
Thread.currentThread().sleep(1);
@@ -427,7 +429,7 @@ private boolean shouldClose(RocksDBException e) {
}
}
- private UncheckedAutoCloseable acquire() throws RocksDatabaseException {
+ UncheckedAutoCloseable acquire() throws RocksDatabaseException {
if (isClosed()) {
throw new RocksDatabaseException("Rocks Database is closed");
}
@@ -770,17 +772,24 @@ public long getLatestSequenceNumber() throws
RocksDatabaseException {
public ManagedRocksIterator newIterator(ColumnFamily family)
throws RocksDatabaseException {
- try (UncheckedAutoCloseable ignored = acquire()) {
- return managed(db.get().newIterator(family.getHandle()));
+ final UncheckedAutoCloseable ref = acquire();
+ try {
+ return managed(db.get().newIterator(family.getHandle()), ref);
+ } catch (RuntimeException e) {
+ ref.close();
+ throw e;
}
}
public ManagedRocksIterator newIterator(ColumnFamily family,
boolean fillCache) throws RocksDatabaseException {
- try (UncheckedAutoCloseable ignored = acquire();
- ManagedReadOptions readOptions = new ManagedReadOptions()) {
+ final UncheckedAutoCloseable ref = acquire();
+ try (ManagedReadOptions readOptions = new ManagedReadOptions()) {
readOptions.setFillCache(fillCache);
- return managed(db.get().newIterator(family.getHandle(), readOptions));
+ return managed(db.get().newIterator(family.getHandle(), readOptions),
ref);
+ } catch (RuntimeException e) {
+ ref.close();
+ throw e;
}
}
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIteratorWithDBClose.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIteratorWithDBClose.java
new file mode 100644
index 00000000000..b0484fba393
--- /dev/null
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIteratorWithDBClose.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import static org.apache.hadoop.hdds.utils.db.IteratorType.KEY_AND_VALUE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.RocksDB;
+
+/**
+ * Tests that RDBStoreAbstractIterator handles concurrent DB close safely.
+ */
+public class TestRDBStoreIteratorWithDBClose {
+
+ private static final String TABLE_NAME = "TestTable";
+ private static final int ENTRY_COUNT = 100;
+
+ private RDBStore rdbStore;
+ private ManagedDBOptions options;
+
+ @BeforeEach
+ public void setUp(@TempDir File tempDir) throws Exception {
+ options = TestRDBStore.newManagedDBOptions();
+ Set<TableConfig> configSet = new HashSet<>();
+ // RocksDB always requires the default column family to be present
+ configSet.add(new TableConfig(
+ StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+ new ManagedColumnFamilyOptions()));
+ configSet.add(new TableConfig(TABLE_NAME, new
ManagedColumnFamilyOptions()));
+ rdbStore = TestRDBStore.newRDBStore(tempDir, options, configSet);
+
+ RDBTable table = rdbStore.getTable(TABLE_NAME);
+ for (int i = 0; i < ENTRY_COUNT; i++) {
+ byte[] key = ("key-" + i).getBytes(StandardCharsets.UTF_8);
+ byte[] value = ("value-" + i).getBytes(StandardCharsets.UTF_8);
+ table.put(key, value);
+ }
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (rdbStore != null && !rdbStore.isClosed()) {
+ rdbStore.close();
+ }
+ if (options != null) {
+ options.close();
+ }
+ }
+
+ /**
+ * Validates the fast-fail check (isClosed guard).
+ */
+ @Test
+ public void testHasNextReturnsFalseAfterDBClosed() throws Exception {
+ RDBTable table = rdbStore.getTable(TABLE_NAME);
+
+ int threadCount = 10;
+ // Each thread iterates for up to 60 s; with 100 ms per entry × 100 entries
+ // one pass takes ~10 s, so threads are guaranteed mid-scan at the 5 s
close.
+ long iterationCycleMs = 60_000;
+ long closeAfterMs = 5_000;
+ int sleepPerEntryMs = 100;
+
+ CountDownLatch allStarted = new CountDownLatch(threadCount);
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ List<Future<Void>> futures = new ArrayList<>();
+
+ for (int t = 0; t < threadCount; t++) {
+ futures.add(executor.submit((Callable<Void>) () -> {
+ try (Table.KeyValueIterator<byte[], byte[]> iter =
+ table.iterator((byte[]) null, KEY_AND_VALUE)) {
+ allStarted.countDown();
+ long deadline = System.currentTimeMillis() + iterationCycleMs;
+ while (System.currentTimeMillis() < deadline) {
+ while (iter.hasNext()) {
+ iter.next();
+ Thread.sleep(sleepPerEntryMs);
+ }
+ // hasNext() returned false: either DB was closed or natural table
end.
+ // isClosed() is package-private on RDBTable, accessible from this
package.
+ if (table.isClosed()) {
+ break; // DB closed — exit cleanly
+ }
+ // Natural end of table — seek back for another pass
+ iter.seekToFirst();
+ }
+ }
+ return null;
+ }));
+ }
+
+ assertTrue(allStarted.await(10, TimeUnit.SECONDS),
+ "All scanner threads should start within 10 seconds");
+
+ // All threads are mid-scan; close the DB now
+ Thread.sleep(closeAfterMs);
+ rdbStore.close();
+
+ // Every thread must detect the close via hasNext() → false and finish
cleanly
+ for (Future<Void> future : futures) {
+ assertDoesNotThrow(() -> future.get(15, TimeUnit.SECONDS),
+ "Each scanner thread should complete cleanly after DB close");
+ }
+
+ executor.shutdown();
+ }
+
+ /**
+ * Validates the acquire/release mechanism.
+ */
+ @Test
+ public void testDBPhysicalCloseWaitsForIterator() throws Exception {
+ RDBTable table = rdbStore.getTable(TABLE_NAME);
+ Table.KeyValueIterator<byte[], byte[]> iter =
+ table.iterator((byte[]) null, KEY_AND_VALUE);
+
+ AtomicBoolean dbCloseCompleted = new AtomicBoolean(false);
+ CountDownLatch closeStarted = new CountDownLatch(1);
+
+ // Background thread simulates failVolume() → closeDbStore()
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> closeFuture = executor.submit(() -> {
+ closeStarted.countDown();
+ rdbStore.close(); // blocks in waitAndClose() until counter == 0
+ dbCloseCompleted.set(true);
+ });
+
+ // Wait for close thread to start and reach waitAndClose()
+ assertTrue(closeStarted.await(5, TimeUnit.SECONDS));
+ Thread.sleep(50);
+
+ // DB must NOT be physically closed yet — iterator holds dbRef (counter >
0)
+ assertFalse(dbCloseCompleted.get(),
+ "DB should not be physically closed while iterator is still open");
+
+ // Release the iterator — decrements RocksDatabase.counter to 0
+ iter.close();
+
+ // Now waitAndClose() can proceed — DB physically closes
+ closeFuture.get(5, TimeUnit.SECONDS);
+ assertTrue(dbCloseCompleted.get(),
+ "DB should be physically closed after iterator is released");
+
+ executor.shutdown();
+ }
+
+ /**
+ * Validates the end-to-end race scenario.
+ *
+ * BackgroundContainerDataScanner iterates while StorageVolumeChecker
+ * concurrently triggers failVolume(). The scan must exit cleanly without
+ * any exception or native crash — hasNext() returns false once the DB is
+ * closed, and the iterator releases its dbRef allowing the DB to close.
+ */
+ @Test
+ public void testConcurrentDBCloseAndScanExitsCleanly() throws Exception {
+ RDBTable table = rdbStore.getTable(TABLE_NAME);
+
+ CountDownLatch scanStarted = new CountDownLatch(1);
+ AtomicBoolean scanCompleted = new AtomicBoolean(false);
+
+ // Scanner thread — simulates BackgroundContainerDataScanner
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> scanFuture = executor.submit((Callable<Void>) () -> {
+ try (Table.KeyValueIterator<byte[], byte[]> iter =
+ table.iterator((byte[]) null, KEY_AND_VALUE)) {
+ scanStarted.countDown();
+ while (iter.hasNext()) {
+ iter.next();
+ Thread.sleep(1); // slow scan to maximise chance of DB close racing
+ }
+ }
+ scanCompleted.set(true);
+ return null;
+ });
+
+ // Wait for scan to start, then trigger failVolume() concurrently
+ assertTrue(scanStarted.await(5, TimeUnit.SECONDS));
+ rdbStore.close(); // simulates failVolume() → closeDbStore()
+
+ assertDoesNotThrow(() -> scanFuture.get(10, TimeUnit.SECONDS),
+ "Scan should exit cleanly without throwing when DB is closed
concurrently");
+ assertTrue(scanCompleted.get(),
+ "Scan loop should complete (via hasNext() returning false), not hang");
+
+ executor.shutdown();
+ }
+
+ /**
+ * Verifies that forEachRemaining() exits cleanly without errors when the
database is closed
+ * concurrently during iteration.
+ */
+ @Test
+ public void testForEachRemainingExitsCleanlyOnConcurrentDBClose() throws
Exception {
+ RDBTable table = rdbStore.getTable(TABLE_NAME);
+
+ int threadCount = 5;
+ CountDownLatch allStarted = new CountDownLatch(threadCount);
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ List<Future<Integer>> futures = new ArrayList<>();
+
+ for (int t = 0; t < threadCount; t++) {
+ futures.add(executor.submit(() -> {
+ AtomicInteger seen = new AtomicInteger(0);
+ try (Table.KeyValueIterator<byte[], byte[]> iter =
+ table.iterator((byte[]) null, KEY_AND_VALUE)) {
+ allStarted.countDown();
+ // forEachRemaining() is the exact "while (hasNext()) { next(); }"
loop
+ // that the race condition can affect.
+ iter.forEachRemaining(kv -> {
+ seen.incrementAndGet();
+ try {
+ Thread.sleep(1); // slow scan to maximise race window
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+ return seen.get();
+ }));
+ }
+
+ assertTrue(allStarted.await(10, TimeUnit.SECONDS),
+ "All scanner threads should start within 10 seconds");
+
+ // Close the DB while threads are mid-scan; this is the race trigger.
+ rdbStore.close();
+
+ for (Future<Integer> future : futures) {
+ assertDoesNotThrow(() -> {
+ int count = future.get(10, TimeUnit.SECONDS);
+ assertTrue(count >= 0 && count <= ENTRY_COUNT,
+ "Thread should have observed between 0 and " + ENTRY_COUNT + "
entries");
+ }, "forEachRemaining must exit cleanly without NoSuchElementException");
+ }
+
+ executor.shutdown();
+ }
+
+ /**
+ * Validates that closing an iterator after DB close does not throw.
+ */
+ @Test
+ public void testIteratorCloseAfterDBCloseDoesNotThrow() throws Exception {
+ RDBTable table = rdbStore.getTable(TABLE_NAME);
+ Table.KeyValueIterator<byte[], byte[]> iter =
+ table.iterator((byte[]) null, KEY_AND_VALUE);
+
+ CountDownLatch closeStarted = new CountDownLatch(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ // Simulate failVolume() from StorageVolumeChecker's thread
+ Future<?> closeFuture = executor.submit(() -> {
+ closeStarted.countDown();
+ rdbStore.close(); // blocks until iter.close() releases dbRef
+ });
+
+ // Wait for close thread to mark isClosed = true
+ assertTrue(closeStarted.await(5, TimeUnit.SECONDS));
+ Thread.sleep(50);
+
+ // iter.close() is called after DB is marked closed — must not throw
+ assertDoesNotThrow(iter::close,
+ "Iterator close() after DB close must not throw");
+
+ // iter.close() decremented counter to 0 — close thread can now finish
+ closeFuture.get(5, TimeUnit.SECONDS);
+ executor.shutdown();
+ }
+}
diff --git
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
index 512df4a8e73..20cf84ce542 100644
---
a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
+++
b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksIterator.java
@@ -17,18 +17,47 @@
package org.apache.hadoop.hdds.utils.db.managed;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.RocksIterator;
/**
* Managed RocksIterator.
+ *
+ * <p>When constructed with a {@code dbRef} (an acquired reference to the
+ * underlying RocksDatabase counter), the iterator holds that reference for its
+ * entire lifetime and releases it in {@link #close()}. This guarantees the DB
+ * cannot be physically destroyed (via waitAndClose) while the iterator is
open,
+ * eliminating any TOCTOU race between iterator creation and use.
*/
public class ManagedRocksIterator extends ManagedObject<RocksIterator> {
- public ManagedRocksIterator(RocksIterator original) {
+ private final UncheckedAutoCloseable dbRef;
+
+ public ManagedRocksIterator(RocksIterator original, UncheckedAutoCloseable
dbRef) {
super(original);
+ this.dbRef = dbRef;
+ }
+
+ public ManagedRocksIterator(RocksIterator original) {
+ this(original, null);
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } finally {
+ if (dbRef != null) {
+ dbRef.close();
+ }
+ }
}
public static ManagedRocksIterator managed(RocksIterator iterator) {
return new ManagedRocksIterator(iterator);
}
+
+ public static ManagedRocksIterator managed(RocksIterator iterator,
UncheckedAutoCloseable dbRef) {
+ return new ManagedRocksIterator(iterator, dbRef);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]