This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0be30ef55e6 atomic move segment files to staging location prior to
delete (#18696)
0be30ef55e6 is described below
commit 0be30ef55e65e6ccd26f5444a6f8a11268831d74
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Oct 27 11:08:18 2025 -0700
atomic move segment files to staging location prior to delete (#18696)
* atomic move segment files to staging location prior to drop to ensure no
partial segment files if a failure during a delete operation occurs
---
.../msq/exec/TaskDataSegmentProviderTest.java | 4 +-
.../segment/loading/SegmentLocalCacheManager.java | 54 ++++++++++++++++++----
.../SegmentLocalCacheManagerConcurrencyTest.java | 5 +-
3 files changed, 50 insertions(+), 13 deletions(-)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
index f22e7cb4fe0..56cea64175a 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
@@ -214,9 +214,9 @@ class TaskDataSegmentProviderTest extends
InitializedNullHandlingTest
Assertions.assertTrue(FutureUtils.getUnchecked(testFuture, false), "Test
iteration #" + i);
}
- // Cache dir should exist, but be empty, since we've closed all holders.
+ // Cache dir should exist, but be (mostly) empty, since we've closed all
holders.
Assertions.assertTrue(cacheDir.exists());
- Assertions.assertEquals(List.of(), List.of(cacheDir.list()));
+ Assertions.assertEquals(List.of("__drop"), List.of(cacheDir.list()));
}
private class TestCoordinatorClientImpl extends NoopCoordinatorClient
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 67c30e779e7..04cb3dbca98 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -71,6 +71,8 @@ import java.util.function.Supplier;
*/
public class SegmentLocalCacheManager implements SegmentCacheManager
{
+ private static final String DROP_PATH = "__drop";
+
@VisibleForTesting
static final String DOWNLOAD_START_MARKER_FILE_NAME = "downloadStartMarker";
@@ -191,6 +193,25 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
);
}
+ // clean up any dropping files
+ for (StorageLocation location : locations) {
+ File dropFiles = new File(location.getPath(), DROP_PATH);
+ if (dropFiles.exists()) {
+ final File[] dropping = dropFiles.listFiles();
+ if (dropping != null) {
+ log.debug("cleaning up[%s] segments in[%s]", dropping.length,
dropFiles);
+ for (File droppedFile : dropping) {
+ try {
+ FileUtils.deleteDirectory(droppedFile);
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to remove dropped segment directory[%s]",
droppedFile);
+ }
+ }
+ }
+ }
+ }
+
final List<DataSegment> cachedSegments = new ArrayList<>();
final File[] segmentsToLoad = retrieveSegmentMetadataFiles();
@@ -717,7 +738,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
} else {
// entry is not reserved, clean it up
-
deleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath()));
+
atomicMoveAndDeleteCacheEntryDirectory(cacheEntry.toPotentialLocation(location.getPath()));
}
}
}
@@ -747,13 +768,22 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
/**
- * Deletes a directory and logs about it. This method should only be called
under the lock of a {@link #segmentLocks}
+ * Performs an atomic move to a sibling {@link #DROP_PATH} directory, and
then deletes the directory and logs about
+ * it. This method should only be called under the lock of a {@link
#segmentLocks}.
*/
- private static void deleteCacheEntryDirectory(final File path)
+ private static void atomicMoveAndDeleteCacheEntryDirectory(final File path)
{
- log.info("Deleting directory[%s]", path);
+ final File parent = path.getParentFile();
+ final File tempLocation = new File(parent, DROP_PATH);
try {
- FileUtils.deleteDirectory(path);
+ if (!tempLocation.exists()) {
+ FileUtils.mkdirp(tempLocation);
+ }
+ final File tempPath = new File(tempLocation, path.getName());
+ log.debug("moving[%s] to temp location[%s]", path, tempLocation);
+ Files.move(path.toPath(), tempPath.toPath(),
StandardCopyOption.ATOMIC_MOVE);
+ log.info("Deleting directory[%s]", path);
+ FileUtils.deleteDirectory(tempPath);
}
catch (Exception e) {
log.error(e, "Unable to remove directory[%s]", path);
@@ -761,7 +791,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
}
/**
- * Calls {@link #deleteCacheEntryDirectory(File)} and then checks parent
path if it is empty, and recursively
+ * Calls {@link FileUtils#deleteDirectory(File)} and then checks parent path
if it is empty, and recursively
* continues until a non-empty directory or the base path is reached. This
method is not thread-safe, and should only
* be used by a single caller.
*/
@@ -771,7 +801,13 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
return;
}
- deleteCacheEntryDirectory(cacheFile);
+ try {
+ log.info("Deleting migrated segment directory[%s]", cacheFile);
+ FileUtils.deleteDirectory(cacheFile);
+ }
+ catch (Exception e) {
+ log.warn(e, "Unable to remove directory[%s]", cacheFile);
+ }
File parent = cacheFile.getParentFile();
if (parent != null) {
@@ -898,7 +934,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
"[%s] may be damaged. Delete all the segment files and pull
from DeepStorage again.",
storageDir.getAbsolutePath()
);
- deleteCacheEntryDirectory(storageDir);
+ atomicMoveAndDeleteCacheEntryDirectory(storageDir);
} else {
needsLoad = false;
}
@@ -973,7 +1009,7 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
return;
}
if (storageDir != null) {
- deleteCacheEntryDirectory(storageDir);
+ atomicMoveAndDeleteCacheEntryDirectory(storageDir);
storageDir = null;
location = null;
}
diff --git
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
index 366b3cbbc33..f0ae0042364 100644
---
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerConcurrencyTest.java
@@ -716,8 +716,9 @@ class SegmentLocalCacheManagerConcurrencyTest
Assertions.assertEquals(0, location2.getActiveWeakHolds());
Assertions.assertTrue(4 >= location.getWeakEntryCount());
Assertions.assertTrue(4 >= location2.getWeakEntryCount());
- Assertions.assertTrue(4 >= location.getPath().listFiles().length);
- Assertions.assertTrue(4 >= location2.getPath().listFiles().length);
+ // 5 because __drop path
+ Assertions.assertTrue(5 >= location.getPath().listFiles().length);
+ Assertions.assertTrue(5 >= location2.getPath().listFiles().length);
Assertions.assertTrue(location.getStats().getLoadCount() >= 4);
Assertions.assertTrue(location2.getStats().getLoadCount() >= 4);
Assertions.assertEquals(location.getStats().getEvictionCount(),
location.getStats().getUnmountCount());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]