This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cce3dca714e IGNITE-27926 Implement Raft group destruction in Segstore
(#7958)
cce3dca714e is described below
commit cce3dca714e5e58902ba3617fc210a85cbcf1432
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 16 17:00:38 2026 +0300
IGNITE-27926 Implement Raft group destruction in Segstore (#7958)
---
.../raft/storage/segstore/IndexFileManager.java | 16 +++-
.../raft/storage/segstore/SegmentFileManager.java | 15 +++
.../storage/segstore/SegmentLogStorageManager.java | 6 +-
.../storage/segstore/IndexFileManagerTest.java | 51 +++++++++++
.../segstore/RaftLogGarbageCollectorTest.java | 101 ++++++++++++++++++++-
5 files changed, 184 insertions(+), 5 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 4b8453c69ad..668b0d23979 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static java.lang.Math.toIntExact;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
@@ -137,7 +138,6 @@ class IndexFileManager {
/**
* Index file metadata grouped by Raft Group ID.
*/
- // FIXME: This map is never cleaned up, see
https://issues.apache.org/jira/browse/IGNITE-27926.
private final Map<Long, GroupIndexMeta> groupIndexMetas = new
ConcurrentHashMap<>();
IndexFileManager(Path baseDir) throws IOException {
@@ -430,6 +430,10 @@ class IndexFileManager {
return null;
}
+ if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
+ return null;
+ }
+
if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
// No prefix truncation required, simply create a new meta.
return new IndexFileMeta(firstLogIndexInclusive,
lastLogIndexExclusive, payloadOffset, fileProperties);
@@ -444,13 +448,19 @@ class IndexFileManager {
}
private void putIndexFileMeta(IndexMetaSpec metaSpec) {
- IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
-
// Using boxed value to avoid unnecessary autoboxing later.
Long groupId = metaSpec.groupId();
long firstIndexKept = metaSpec.firstIndexKept();
+ if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
+ groupIndexMetas.remove(groupId);
+
+ return;
+ }
+
+ IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
+
GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
if (existingGroupIndexMeta == null) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 6c9f952acb7..08cad457078 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -114,6 +114,13 @@ class SegmentFileManager implements ManuallyCloseable {
*/
static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes.
+ /**
+ * Special "destroy group" sentinel value for the log reset index.
+ *
+ * <p>Must not overlap with a valid stored log index.
+ */
+ static final long GROUP_DESTROY_LOG_INDEX = Long.MIN_VALUE;
+
private final String storageName;
private final Path segmentFilesDir;
@@ -422,6 +429,14 @@ class SegmentFileManager implements ManuallyCloseable {
}
}
+ /**
+ * Destroys all log data for the given group. Writes a tombstone using
{@link #GROUP_DESTROY_LOG_INDEX} so that the GC can discard
+ * the group's entries on the next compaction pass.
+ */
+ void destroyGroup(long groupId) throws IOException {
+ reset(groupId, GROUP_DESTROY_LOG_INDEX);
+ }
+
private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws
IOException {
while (true) {
SegmentFileWithMemtable segmentFileWithMemtable =
currentSegmentFile();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
index 97cf85b013a..ce25b88e8ac 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
@@ -78,7 +78,11 @@ public class SegmentLogStorageManager implements
LogStorageManager {
@Override
public void destroyLogStorage(String raftNodeStorageId) {
- // TODO IGNITE-28527 Implement.
+ try {
+ fileManager.destroyGroup(convertNodeId(raftNodeStorageId));
+ } catch (IOException e) {
+ throw new LogStorageException(String.format("Failed to destroy log
storage [storageId=%s]", raftNodeStorageId), e);
+ }
}
@Override
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 78453d5f3c6..3d70d171299 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -644,4 +645,54 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(1, 2),
is(nullValue()));
}
+
+ @Test
+ void testDestroyGroupSoleTombstoneClearsIndexMetaOnCheckpoint() throws
IOException {
+ var memtable = new SingleThreadMemTable();
+ memtable.appendSegmentFileOffset(0, 1, 10);
+ memtable.appendSegmentFileOffset(1, 1, 20);
+ indexFileManager.saveNewIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+ assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
+
+ // Sole tombstone: the segment file had no entries for group 0, only
the destroy tombstone.
+ var destroyMemtable = new SingleThreadMemTable();
+ destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
+ indexFileManager.saveNewIndexMemtable(destroyMemtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));
+
+ // Group 1 must be unaffected.
+ assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new
SegmentFilePointer(new FileProperties(0), 20)));
+ assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
+ }
+
+ @Test
+ void testDestroyGroupWithPriorDataClearsIndexMetaOnCheckpoint() throws
IOException {
+ var memtable = new SingleThreadMemTable();
+ memtable.appendSegmentFileOffset(0, 1, 10);
+ memtable.appendSegmentFileOffset(1, 1, 20);
+ indexFileManager.saveNewIndexMemtable(memtable);
+
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+
+ // Group 0 writes more entries in the next segment, then gets
destroyed in the same segment.
+ var destroyMemtable = new SingleThreadMemTable();
+ destroyMemtable.appendSegmentFileOffset(0, 2, 100);
+ destroyMemtable.appendSegmentFileOffset(0, 3, 200);
+ destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
+ indexFileManager.saveNewIndexMemtable(destroyMemtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 2),
is(nullValue()));
+ assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
+ assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));
+
+ // Group 1 must be unaffected.
+ assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new
SegmentFilePointer(new FileProperties(0), 20)));
+ assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
index 2ce340b12f2..9ff9d7d4eb6 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -646,6 +647,100 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
});
}
+ /**
+ * Verifies that after {@link SegmentFileManager#destroyGroup} is called,
the GC can remove segment files belonging
+ * to the destroyed group.
+ */
+ @Test
+ void testSegmentFilesRemovedByGcAfterGroupDestroy() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size()
- 1)));
+
+ List<Path> oldSegmentFiles = segmentFiles().subList(0,
segmentFiles().size() - 1);
+ List<Path> oldIndexFiles = indexFiles();
+
+ fileManager.destroyGroup(GROUP_ID_1);
+
+ // The destroy tombstone is in the current segment file's memtable.
Trigger a rollover using a different group so the
+ // checkpoint thread processes the tombstone and removes GROUP_ID_1
from the in-memory index.
+ triggerAndAwaitCheckpoint(GROUP_ID_2, 0);
+
+ for (Path segmentFile : oldSegmentFiles) {
+ runCompaction(segmentFile);
+ }
+
+ for (Path segmentFile : oldSegmentFiles) {
+ assertThat(segmentFile, not(exists()));
+ }
+
+ for (Path indexFile : oldIndexFiles) {
+ assertThat(indexFile, not(exists()));
+ }
+
+ for (int i = 0; i < batches.size(); i++) {
+ int index = i;
+
+ fileManager.getEntry(GROUP_ID_1, index, bs -> {
+ fail("Entry for index " + index + " must be missing");
+
+ return null;
+ });
+ }
+ }
+
+ /**
+ * Verifies that when a group is destroyed while another group shares the
same segment files, GC compacts each shared file by dropping
+ * only the destroyed group's entries while preserving the surviving
group's entries.
+ */
+ @Test
+ void testSegmentFilesCompactedByGcAfterGroupDestroyWithSurvivingGroup()
throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ appendBytes(GROUP_ID_2, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size()
- 1)));
+
+ List<Path> oldSegmentFiles = segmentFiles().subList(0,
segmentFiles().size() - 1);
+
+ // Destroy one group while the other remains active.
+ fileManager.destroyGroup(GROUP_ID_1);
+
+ // Trigger a rollover using GROUP_ID_2 so the checkpoint thread
processes the tombstone and removes GROUP_ID_1 from
+ // the in-memory index, enabling GC to compact the shared segment
files.
+ triggerAndAwaitCheckpoint(GROUP_ID_2, batches.size() - 1);
+
+ for (Path segmentFile : oldSegmentFiles) {
+ FileProperties originalProperties =
SegmentFile.fileProperties(segmentFile);
+
+ runCompaction(segmentFile);
+
+ // Original file must have been replaced by a compacted generation
(GROUP_ID_2 entries survive).
+ assertThat(segmentFile, not(exists()));
+
+ var newFileProperties = new
FileProperties(originalProperties.ordinal(), originalProperties.generation() +
1);
+
+
assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)),
exists());
+ }
+
+ // GROUP_ID_2 entries must all still be readable.
+ for (int i = 0; i < batches.size(); i++) {
+ int index = i;
+
+ fileManager.getEntry(GROUP_ID_2, i, bs -> {
+ assertThat(bs, is(batches.get(index)));
+ return null;
+ });
+ }
+ }
+
@Test
void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
// Fill multiple segment files to create both segment and index files.
@@ -717,13 +812,17 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
}
private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws
IOException {
+ triggerAndAwaitCheckpoint(GROUP_ID_1, lastGroupIndex);
+ }
+
+ private void triggerAndAwaitCheckpoint(long groupId, long lastGroupIndex)
throws IOException {
List<Path> segmentFilesBeforeCheckpoint = segmentFiles();
// Insert some entries to trigger a rollover (and a checkpoint).
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
for (int i = 0; i < batches.size(); i++) {
- appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1);
+ appendBytes(groupId, batches.get(i), lastGroupIndex + i + 1);
}
List<Path> segmentFilesAfterCheckpoint = segmentFiles();