This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new cce3dca714e IGNITE-27926 Implement Raft group destruction in Segstore 
(#7958)
cce3dca714e is described below

commit cce3dca714e5e58902ba3617fc210a85cbcf1432
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 16 17:00:38 2026 +0300

    IGNITE-27926 Implement Raft group destruction in Segstore (#7958)
---
 .../raft/storage/segstore/IndexFileManager.java    |  16 +++-
 .../raft/storage/segstore/SegmentFileManager.java  |  15 +++
 .../storage/segstore/SegmentLogStorageManager.java |   6 +-
 .../storage/segstore/IndexFileManagerTest.java     |  51 +++++++++++
 .../segstore/RaftLogGarbageCollectorTest.java      | 101 ++++++++++++++++++++-
 5 files changed, 184 insertions(+), 5 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 4b8453c69ad..668b0d23979 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
 import static java.lang.Math.toIntExact;
 import static java.nio.file.StandardOpenOption.CREATE_NEW;
 import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
 import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
 import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
 
@@ -137,7 +138,6 @@ class IndexFileManager {
     /**
      * Index file metadata grouped by Raft Group ID.
      */
-    // FIXME: This map is never cleaned up, see 
https://issues.apache.org/jira/browse/IGNITE-27926.
     private final Map<Long, GroupIndexMeta> groupIndexMetas = new 
ConcurrentHashMap<>();
 
     IndexFileManager(Path baseDir) throws IOException {
@@ -430,6 +430,10 @@ class IndexFileManager {
             return null;
         }
 
+        if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
+            return null;
+        }
+
         if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
             // No prefix truncation required, simply create a new meta.
             return new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileProperties);
@@ -444,13 +448,19 @@ class IndexFileManager {
     }
 
     private void putIndexFileMeta(IndexMetaSpec metaSpec) {
-        IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
-
         // Using boxed value to avoid unnecessary autoboxing later.
         Long groupId = metaSpec.groupId();
 
         long firstIndexKept = metaSpec.firstIndexKept();
 
+        if (firstIndexKept == GROUP_DESTROY_LOG_INDEX) {
+            groupIndexMetas.remove(groupId);
+
+            return;
+        }
+
+        IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
+
         GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
 
         if (existingGroupIndexMeta == null) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 6c9f952acb7..08cad457078 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -114,6 +114,13 @@ class SegmentFileManager implements ManuallyCloseable {
      */
     static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes.
 
+    /**
+     * Special "destroy group" sentinel value for the log reset index.
+     *
+     * <p>Must not overlap with a valid stored log index.
+     */
+    static final long GROUP_DESTROY_LOG_INDEX = Long.MIN_VALUE;
+
     private final String storageName;
 
     private final Path segmentFilesDir;
@@ -422,6 +429,14 @@ class SegmentFileManager implements ManuallyCloseable {
         }
     }
 
+    /**
+     * Destroys all log data for the given group. Writes a tombstone using 
{@link #GROUP_DESTROY_LOG_INDEX} so that the GC can discard
+     * the group's entries on the next compaction pass.
+     */
+    void destroyGroup(long groupId) throws IOException {
+        reset(groupId, GROUP_DESTROY_LOG_INDEX);
+    }
+
     private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws 
IOException {
         while (true) {
             SegmentFileWithMemtable segmentFileWithMemtable = 
currentSegmentFile();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
index 97cf85b013a..ce25b88e8ac 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentLogStorageManager.java
@@ -78,7 +78,11 @@ public class SegmentLogStorageManager implements 
LogStorageManager {
 
     @Override
     public void destroyLogStorage(String raftNodeStorageId) {
-        // TODO IGNITE-28527 Implement.
+        try {
+            fileManager.destroyGroup(convertNodeId(raftNodeStorageId));
+        } catch (IOException e) {
+            throw new LogStorageException(String.format("Failed to destroy log 
storage [storageId=%s]", raftNodeStorageId), e);
+        }
     }
 
     @Override
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 78453d5f3c6..3d70d171299 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.GROUP_DESTROY_LOG_INDEX;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -644,4 +645,54 @@ class IndexFileManagerTest extends IgniteAbstractTest {
 
         assertThat(indexFileManager.getSegmentFilePointer(1, 2), 
is(nullValue()));
     }
+
+    @Test
+    void testDestroyGroupSoleTombstoneClearsIndexMetaOnCheckpoint() throws 
IOException {
+        var memtable = new SingleThreadMemTable();
+        memtable.appendSegmentFileOffset(0, 1, 10);
+        memtable.appendSegmentFileOffset(1, 1, 20);
+        indexFileManager.saveNewIndexMemtable(memtable);
+
+        assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+        assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
+
+        // Sole tombstone: the segment file had no entries for group 0, only 
the destroy tombstone.
+        var destroyMemtable = new SingleThreadMemTable();
+        destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
+        indexFileManager.saveNewIndexMemtable(destroyMemtable);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
+        assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));
+
+        // Group 1 must be unaffected.
+        assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new 
SegmentFilePointer(new FileProperties(0), 20)));
+        assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
+    }
+
+    @Test
+    void testDestroyGroupWithPriorDataClearsIndexMetaOnCheckpoint() throws 
IOException {
+        var memtable = new SingleThreadMemTable();
+        memtable.appendSegmentFileOffset(0, 1, 10);
+        memtable.appendSegmentFileOffset(1, 1, 20);
+        indexFileManager.saveNewIndexMemtable(memtable);
+
+        assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+
+        // Group 0 writes more entries in the next segment, then gets 
destroyed in the same segment.
+        var destroyMemtable = new SingleThreadMemTable();
+        destroyMemtable.appendSegmentFileOffset(0, 2, 100);
+        destroyMemtable.appendSegmentFileOffset(0, 3, 200);
+        destroyMemtable.reset(0, GROUP_DESTROY_LOG_INDEX);
+        indexFileManager.saveNewIndexMemtable(destroyMemtable);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), 
is(nullValue()));
+        assertThat(indexFileManager.firstLogIndexInclusive(0), is(-1L));
+        assertThat(indexFileManager.lastLogIndexExclusive(0), is(-1L));
+
+        // Group 1 must be unaffected.
+        assertThat(indexFileManager.getSegmentFilePointer(1, 1), is(new 
SegmentFilePointer(new FileProperties(0), 20)));
+        assertThat(indexFileManager.firstLogIndexInclusive(1), is(1L));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
index 2ce340b12f2..9ff9d7d4eb6 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -646,6 +647,100 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
         });
     }
 
+    /**
+     * Verifies that after {@link SegmentFileManager#destroyGroup} is called, 
the GC can remove segment files belonging
+     * to the destroyed group.
+     */
+    @Test
+    void testSegmentFilesRemovedByGcAfterGroupDestroy() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() 
- 1)));
+
+        List<Path> oldSegmentFiles = segmentFiles().subList(0, 
segmentFiles().size() - 1);
+        List<Path> oldIndexFiles = indexFiles();
+
+        fileManager.destroyGroup(GROUP_ID_1);
+
+        // The destroy tombstone is in the current segment file's memtable. 
Trigger a rollover using a different group so the
+        // checkpoint thread processes the tombstone and removes GROUP_ID_1 
from the in-memory index.
+        triggerAndAwaitCheckpoint(GROUP_ID_2, 0);
+
+        for (Path segmentFile : oldSegmentFiles) {
+            runCompaction(segmentFile);
+        }
+
+        for (Path segmentFile : oldSegmentFiles) {
+            assertThat(segmentFile, not(exists()));
+        }
+
+        for (Path indexFile : oldIndexFiles) {
+            assertThat(indexFile, not(exists()));
+        }
+
+        for (int i = 0; i < batches.size(); i++) {
+            int index = i;
+
+            fileManager.getEntry(GROUP_ID_1, index, bs -> {
+                fail("Entry for index " + index + " must be missing");
+
+                return null;
+            });
+        }
+    }
+
+    /**
+     * Verifies that when a group is destroyed while another group shares the 
same segment files, GC compacts each shared file by dropping
+     * only the destroyed group's entries while preserving the surviving 
group's entries.
+     */
+    @Test
+    void testSegmentFilesCompactedByGcAfterGroupDestroyWithSurvivingGroup() 
throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+            appendBytes(GROUP_ID_2, batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() 
- 1)));
+
+        List<Path> oldSegmentFiles = segmentFiles().subList(0, 
segmentFiles().size() - 1);
+
+        // Destroy one group while the other remains active.
+        fileManager.destroyGroup(GROUP_ID_1);
+
+        // Trigger a rollover using GROUP_ID_2 so the checkpoint thread 
processes the tombstone and removes GROUP_ID_1 from
+        // the in-memory index, enabling GC to compact the shared segment 
files.
+        triggerAndAwaitCheckpoint(GROUP_ID_2, batches.size() - 1);
+
+        for (Path segmentFile : oldSegmentFiles) {
+            FileProperties originalProperties = 
SegmentFile.fileProperties(segmentFile);
+
+            runCompaction(segmentFile);
+
+            // Original file must have been replaced by a compacted generation 
(GROUP_ID_2 entries survive).
+            assertThat(segmentFile, not(exists()));
+
+            var newFileProperties = new 
FileProperties(originalProperties.ordinal(), originalProperties.generation() + 
1);
+
+            
assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)),
 exists());
+        }
+
+        // GROUP_ID_2 entries must all still be readable.
+        for (int i = 0; i < batches.size(); i++) {
+            int index = i;
+
+            fileManager.getEntry(GROUP_ID_2, i, bs -> {
+                assertThat(bs, is(batches.get(index)));
+                return null;
+            });
+        }
+    }
+
     @Test
     void testLogSizeBytesInitializedCorrectlyOnStartup() throws Exception {
         // Fill multiple segment files to create both segment and index files.
@@ -717,13 +812,17 @@ class RaftLogGarbageCollectorTest extends 
IgniteAbstractTest {
     }
 
     private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws 
IOException {
+        triggerAndAwaitCheckpoint(GROUP_ID_1, lastGroupIndex);
+    }
+
+    private void triggerAndAwaitCheckpoint(long groupId, long lastGroupIndex) 
throws IOException {
         List<Path> segmentFilesBeforeCheckpoint = segmentFiles();
 
         // Insert some entries to trigger a rollover (and a checkpoint).
         List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
 
         for (int i = 0; i < batches.size(); i++) {
-            appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1);
+            appendBytes(groupId, batches.get(i), lastGroupIndex + i + 1);
         }
 
         List<Path> segmentFilesAfterCheckpoint = segmentFiles();

Reply via email to