This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c0c8663ee5e IGNITE-26294 Implement Raft log Garbage Collector (#7610)
c0c8663ee5e is described below

commit c0c8663ee5e75e929f97f1ee48dedd1da5f3a840
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Feb 25 18:16:47 2026 +0200

    IGNITE-26294 Implement Raft log Garbage Collector (#7610)
---
 .../raft/storage/segstore/GroupIndexMeta.java      |  60 ++-
 .../raft/storage/segstore/GroupInfoProvider.java   |  50 ++
 .../raft/storage/segstore/IndexFileManager.java    | 199 ++++++--
 .../raft/storage/segstore/IndexFileMetaArray.java  |  49 ++
 .../storage/segstore/RaftLogGarbageCollector.java  | 316 ++++++++++++
 .../raft/storage/segstore/SegmentFile.java         |  23 +-
 .../raft/storage/segstore/SegmentFileManager.java  |  61 ++-
 .../storage/segstore/SegmentPayloadParser.java     |   6 +-
 .../raft/storage/segstore/GroupIndexMetaTest.java  |  79 +++
 .../storage/segstore/IndexFileManagerTest.java     |  66 ++-
 .../segstore/RaftLogGarbageCollectorTest.java      | 549 +++++++++++++++++++++
 .../segstore/SegmentFileManagerGetEntryTest.java   |   1 +
 .../storage/segstore/SegmentFileManagerTest.java   |   6 +-
 .../raft/storage/segstore/SegmentFileTest.java     |   3 +-
 .../SegstoreLogStorageConcurrencyTest.java         |   1 +
 .../storage/segstore/SegstoreLogStorageTest.java   |   1 +
 16 files changed, 1363 insertions(+), 107 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index 912fedb951a..edf3c50c54b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -46,12 +46,6 @@ class GroupIndexMeta {
             this.fileMetas = new IndexFileMetaArray(startFileMeta);
         }
 
-        void addIndexMeta(IndexFileMeta indexFileMeta) {
-            IndexFileMetaArray fileMetas = this.fileMetas;
-
-            setFileMetas(fileMetas, fileMetas.add(indexFileMeta));
-        }
-
         long firstLogIndexInclusive() {
             return fileMetas.firstLogIndexInclusive();
         }
@@ -60,21 +54,48 @@ class GroupIndexMeta {
             return fileMetas.lastLogIndexExclusive();
         }
 
+        void addIndexMeta(IndexFileMeta indexFileMeta) {
+            while (true) {
+                IndexFileMetaArray fileMetas = this.fileMetas;
+
+                IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+
+                if (FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas)) {
+                    return;
+                }
+            }
+        }
+
         /**
          * Removes all metas which log indices are smaller than the given 
value.
          */
         void truncateIndicesSmallerThan(long firstLogIndexKept) {
-            IndexFileMetaArray fileMetas = this.fileMetas;
+            while (true) {
+                IndexFileMetaArray fileMetas = this.fileMetas;
+
+                IndexFileMetaArray newFileMetas = 
fileMetas.truncateIndicesSmallerThan(firstLogIndexKept);
 
-            setFileMetas(fileMetas, 
fileMetas.truncateIndicesSmallerThan(firstLogIndexKept));
+                if (FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas)) {
+                    return;
+                }
+            }
         }
 
-        private void setFileMetas(IndexFileMetaArray fileMetas, 
IndexFileMetaArray newFileMetas) {
-            // Simple assignment would suffice, since we only have one thread 
writing to this field, but we use compareAndSet to verify
-            // this invariant, just in case.
-            boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas);
+        boolean onIndexCompacted(FileProperties oldProperties, IndexFileMeta 
newProperties) {
+            while (true) {
+                IndexFileMetaArray fileMetas = this.fileMetas;
+
+                IndexFileMetaArray newFileMetas = 
fileMetas.onIndexCompacted(oldProperties, newProperties);
+
+                // Nothing was updated which means the array does not contain 
index meta for the compacted file.
+                if (fileMetas == newFileMetas) {
+                    return false;
+                }
 
-            assert updated : "Concurrent writes detected";
+                if (FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas)) {
+                    return true;
+                }
+            }
         }
     }
 
@@ -183,6 +204,19 @@ class GroupIndexMeta {
         }
     }
 
+    /**
+     * Called when an index file is being compacted by the GC.
+     *
+     * <p>This means that we need to find index meta for the file being 
compacted and replace it with the meta of the new file.
+     */
+    void onIndexCompacted(FileProperties oldProperties, IndexFileMeta 
newIndexMeta) {
+        for (IndexMetaArrayHolder holder : fileMetaDeque) {
+            if (holder.onIndexCompacted(oldProperties, newIndexMeta)) {
+                return;
+            }
+        }
+    }
+
     long firstLogIndexInclusive() {
         for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
             long firstLogIndex = indexMetaArrayHolder.firstLogIndexInclusive();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
new file mode 100644
index 00000000000..186a06d31fa
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides information about a Raft group present in the storage.
+ */
+@FunctionalInterface
+interface GroupInfoProvider {
+    GroupInfoProvider NO_OP = groupId -> null;
+
+    class GroupInfo {
+        private final long firstLogIndexInclusive;
+
+        private final long lastLogIndexExclusive;
+
+        GroupInfo(long firstLogIndexInclusive, long lastLogIndexExclusive) {
+            this.firstLogIndexInclusive = firstLogIndexInclusive;
+            this.lastLogIndexExclusive = lastLogIndexExclusive;
+        }
+
+        long firstLogIndexInclusive() {
+            return firstLogIndexInclusive;
+        }
+
+        long lastLogIndexExclusive() {
+            return lastLogIndexExclusive;
+        }
+    }
+
+    @Nullable
+    GroupInfo groupInfo(long groupId);
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index f5bb44f8635..468b51530a4 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -32,9 +32,12 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -129,6 +132,7 @@ class IndexFileManager {
     /**
      * Index file metadata grouped by Raft Group ID.
      */
+    // FIXME: This map is never cleaned up, see 
https://issues.apache.org/jira/browse/IGNITE-27926.
     private final Map<Long, GroupIndexMeta> groupIndexMetas = new 
ConcurrentHashMap<>();
 
     IndexFileManager(Path baseDir) throws IOException {
@@ -151,7 +155,7 @@ class IndexFileManager {
         return indexFilesDir;
     }
 
-    void cleanupTmpFiles() throws IOException {
+    void cleanupLeftoverFiles() throws IOException {
         try (Stream<Path> indexFiles = Files.list(indexFilesDir)) {
             Iterator<Path> it = indexFiles.iterator();
 
@@ -175,25 +179,31 @@ class IndexFileManager {
     Path saveNewIndexMemtable(ReadModeIndexMemTable indexMemTable) throws 
IOException {
         var newFileProperties = new FileProperties(++curFileOrdinal);
 
-        return saveIndexMemtable(indexMemTable, newFileProperties, false);
+        Path indexFilePath = indexFilePath(newFileProperties);
+
+        List<IndexMetaSpec> metaSpecs = saveIndexMemtable(indexFilePath, 
indexMemTable, newFileProperties);
+
+        metaSpecs.forEach(this::putIndexFileMeta);
+
+        return indexFilePath;
     }
 
-    private Path saveIndexMemtable(
+    private List<IndexMetaSpec> saveIndexMemtable(
+            Path indexFilePath,
             ReadModeIndexMemTable indexMemTable,
-            FileProperties fileProperties,
-            boolean onRecovery
+            FileProperties fileProperties
     ) throws IOException {
-        String fileName = indexFileName(fileProperties);
+        String fileName = indexFilePath.getFileName().toString();
 
         Path tmpFilePath = indexFilesDir.resolve(fileName + TMP_FILE_SUFFIX);
 
         assert !Files.exists(indexFilesDir.resolve(fileName)) : "Index file 
already exists: " + fileName;
         assert !Files.exists(tmpFilePath) : "Temporary index file already 
exists: " + tmpFilePath;
 
-        try (var os = new 
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
-            byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable, 
fileProperties, onRecovery);
+        FileHeaderWithIndexMetas fileHeaderWithIndexMetas = 
serializeHeaderAndFillMetadata(indexMemTable, fileProperties);
 
-            os.write(headerBytes);
+        try (var os = new 
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
+            os.write(fileHeaderWithIndexMetas.header());
 
             Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
 
@@ -207,7 +217,9 @@ class IndexFileManager {
             }
         }
 
-        return syncAndRename(tmpFilePath, 
tmpFilePath.resolveSibling(fileName));
+        syncAndRename(tmpFilePath, tmpFilePath.resolveSibling(fileName));
+
+        return fileHeaderWithIndexMetas.indexMetas();
     }
 
     /**
@@ -215,7 +227,33 @@ class IndexFileManager {
      * lost due to a component stop before a checkpoint was able to complete.
      */
     void recoverIndexFile(ReadModeIndexMemTable indexMemTable, FileProperties 
fileProperties) throws IOException {
-        saveIndexMemtable(indexMemTable, fileProperties, true);
+        // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
+        // (see recoverIndexFileMetas).
+        saveIndexMemtable(indexFilePath(fileProperties), indexMemTable, 
fileProperties);
+    }
+
+    Path onIndexFileCompacted(
+            ReadModeIndexMemTable indexMemTable,
+            FileProperties oldIndexFileProperties,
+            FileProperties newIndexFileProperties
+    ) throws IOException {
+        Path newIndexFilePath = indexFilePath(newIndexFileProperties);
+
+        List<IndexMetaSpec> metaSpecs = saveIndexMemtable(newIndexFilePath, 
indexMemTable, newIndexFileProperties);
+
+        metaSpecs.forEach(metaSpec -> {
+            GroupIndexMeta groupIndexMeta = 
groupIndexMetas.get(metaSpec.groupId);
+
+            IndexFileMeta meta = metaSpec.indexFileMeta();
+
+            if (groupIndexMeta != null && meta != null) {
+                groupIndexMeta.onIndexCompacted(oldIndexFileProperties, meta);
+            }
+        });
+
+        LOG.info("New index file created after compaction [path={}].", 
newIndexFilePath);
+
+        return newIndexFilePath;
     }
 
     /**
@@ -224,43 +262,51 @@ class IndexFileManager {
      */
     @Nullable
     SegmentFilePointer getSegmentFilePointer(long groupId, long logIndex) 
throws IOException {
-        GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
+        while (true) {
+            GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
 
-        if (groupIndexMeta == null) {
-            return null;
-        }
+            if (groupIndexMeta == null) {
+                return null;
+            }
 
-        IndexFileMeta indexFileMeta = groupIndexMeta.indexMeta(logIndex);
+            IndexFileMeta indexFileMeta = groupIndexMeta.indexMeta(logIndex);
 
-        if (indexFileMeta == null) {
-            return null;
-        }
+            if (indexFileMeta == null) {
+                return null;
+            }
 
-        Path indexFile = 
indexFilesDir.resolve(indexFileName(indexFileMeta.indexFileProperties()));
+            Path indexFile = 
indexFilesDir.resolve(indexFileName(indexFileMeta.indexFileProperties()));
 
-        // Index file payload is a 0-based array, which indices correspond to 
the [fileMeta.firstLogIndex, fileMeta.lastLogIndex) range.
-        long payloadArrayIndex = logIndex - 
indexFileMeta.firstLogIndexInclusive();
+            // TODO: Consider caching the opened channels for index files: 
https://issues.apache.org/jira/browse/IGNITE-26622.
+            try (SeekableByteChannel channel = Files.newByteChannel(indexFile, 
StandardOpenOption.READ)) {
+                // Index file payload is a 0-based array, which indices 
correspond to the [fileMeta.firstLogIndex, fileMeta.lastLogIndex)
+                // range.
+                long payloadArrayIndex = logIndex - 
indexFileMeta.firstLogIndexInclusive();
 
-        assert payloadArrayIndex >= 0 : payloadArrayIndex;
+                assert payloadArrayIndex >= 0 : payloadArrayIndex;
 
-        long payloadOffset = indexFileMeta.indexFilePayloadOffset() + 
payloadArrayIndex * Integer.BYTES;
+                long payloadOffset = indexFileMeta.indexFilePayloadOffset() + 
payloadArrayIndex * Integer.BYTES;
 
-        try (SeekableByteChannel channel = Files.newByteChannel(indexFile, 
StandardOpenOption.READ)) {
-            channel.position(payloadOffset);
+                channel.position(payloadOffset);
 
-            ByteBuffer segmentPayloadOffsetBuffer = 
ByteBuffer.allocate(Integer.BYTES).order(BYTE_ORDER);
+                ByteBuffer segmentPayloadOffsetBuffer = 
ByteBuffer.allocate(Integer.BYTES).order(BYTE_ORDER);
 
-            while (segmentPayloadOffsetBuffer.hasRemaining()) {
-                int bytesRead = channel.read(segmentPayloadOffsetBuffer);
+                while (segmentPayloadOffsetBuffer.hasRemaining()) {
+                    int bytesRead = channel.read(segmentPayloadOffsetBuffer);
 
-                if (bytesRead == -1) {
-                    throw new EOFException("EOF reached while reading index 
file: " + indexFile);
+                    if (bytesRead == -1) {
+                        throw new EOFException("EOF reached while reading 
index file: " + indexFile);
+                    }
                 }
-            }
 
-            int segmentPayloadOffset = segmentPayloadOffsetBuffer.getInt(0);
+                int segmentPayloadOffset = 
segmentPayloadOffsetBuffer.getInt(0);
 
-            return new SegmentFilePointer(indexFileMeta.indexFileProperties(), 
segmentPayloadOffset);
+                return new 
SegmentFilePointer(indexFileMeta.indexFileProperties(), segmentPayloadOffset);
+            } catch (NoSuchFileException e) {
+                // There exists a race between the Garbage Collection process 
and "groupIndexMetas.get" call. It is possible that
+                // groupIndexMetas returned stale information, we should 
simply retry in this case.
+                LOG.info("Index file not found, retrying [path={}].", 
indexFile);
+            }
         }
     }
 
@@ -282,15 +328,14 @@ class IndexFileManager {
         return groupIndexMeta == null ? -1 : 
groupIndexMeta.lastLogIndexExclusive();
     }
 
-    boolean indexFileExists(FileProperties fileProperties) {
-        return Files.exists(indexFilePath(fileProperties));
-    }
-
     Path indexFilePath(FileProperties fileProperties) {
         return indexFilesDir.resolve(indexFileName(fileProperties));
     }
 
-    private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable 
indexMemTable, FileProperties fileProperties, boolean onRecovery) {
+    private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
+            ReadModeIndexMemTable indexMemTable,
+            FileProperties fileProperties
+    ) {
         int numGroups = indexMemTable.numGroups();
 
         int headerSize = headerSize(numGroups);
@@ -303,6 +348,8 @@ class IndexFileManager {
 
         int payloadOffset = headerSize;
 
+        var metaSpecs = new ArrayList<IndexMetaSpec>(numGroups);
+
         Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
 
         while (it.hasNext()) {
@@ -319,15 +366,11 @@ class IndexFileManager {
 
             long firstIndexKept = segmentInfo.firstIndexKept();
 
-            // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
-            // (see recoverIndexFileMetas).
-            if (!onRecovery) {
-                IndexFileMeta indexFileMeta = createIndexFileMeta(
-                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileProperties
-                );
+            IndexFileMeta indexFileMeta = createIndexFileMeta(
+                    firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileProperties
+            );
 
-                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
-            }
+            metaSpecs.add(new IndexMetaSpec(groupId, indexFileMeta, 
firstIndexKept));
 
             headerBuffer
                     .putLong(groupId)
@@ -340,7 +383,7 @@ class IndexFileManager {
             payloadOffset += payloadSize(segmentInfo);
         }
 
-        return headerBuffer.array();
+        return new FileHeaderWithIndexMetas(headerBuffer.array(), metaSpecs);
     }
 
     private static @Nullable IndexFileMeta createIndexFileMeta(
@@ -370,7 +413,14 @@ class IndexFileManager {
         return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive, 
adjustedPayloadOffset, fileProperties);
     }
 
-    private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta 
indexFileMeta, long firstIndexKept) {
+    private void putIndexFileMeta(IndexMetaSpec metaSpec) {
+        IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
+
+        // Using boxed value to avoid unnecessary autoboxing later.
+        Long groupId = metaSpec.groupId();
+
+        long firstIndexKept = metaSpec.firstIndexKept();
+
         GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
 
         if (existingGroupIndexMeta == null) {
@@ -469,12 +519,14 @@ class IndexFileManager {
                         firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileProperties
                 );
 
-                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
+                var metaSpec = new IndexMetaSpec(groupId, indexFileMeta, 
firstIndexKept);
+
+                putIndexFileMeta(metaSpec);
             }
         }
     }
 
-    private static FileProperties indexFileProperties(Path indexFile) {
+    static FileProperties indexFileProperties(Path indexFile) {
         String fileName = indexFile.getFileName().toString();
 
         Matcher matcher = INDEX_FILE_NAME_PATTERN.matcher(fileName);
@@ -498,4 +550,49 @@ class IndexFileManager {
 
         return result;
     }
+
+    private static class FileHeaderWithIndexMetas {
+        private final byte[] header;
+
+        private final List<IndexMetaSpec> indexMetas;
+
+        FileHeaderWithIndexMetas(byte[] header, List<IndexMetaSpec> 
indexMetas) {
+            this.header = header;
+            this.indexMetas = indexMetas;
+        }
+
+        byte[] header() {
+            return header;
+        }
+
+        List<IndexMetaSpec> indexMetas() {
+            return indexMetas;
+        }
+    }
+
+    private static class IndexMetaSpec {
+        private final Long groupId;
+
+        private final @Nullable IndexFileMeta indexFileMeta;
+
+        private final long firstIndexKept;
+
+        IndexMetaSpec(Long groupId, @Nullable IndexFileMeta indexFileMeta, 
long firstIndexKept) {
+            this.groupId = groupId;
+            this.indexFileMeta = indexFileMeta;
+            this.firstIndexKept = firstIndexKept;
+        }
+
+        Long groupId() {
+            return groupId;
+        }
+
+        @Nullable IndexFileMeta indexFileMeta() {
+            return indexFileMeta;
+        }
+
+        long firstIndexKept() {
+            return firstIndexKept;
+        }
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
index 46b9ffc5df8..5801bee7a16 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -163,4 +163,53 @@ class IndexFileMetaArray {
 
         return new IndexFileMetaArray(newArray, newSize);
     }
+
+    IndexFileMetaArray onIndexCompacted(FileProperties oldProperties, 
IndexFileMeta newMeta) {
+        // Find index meta associated with the file being compacted.
+        int smallestOrdinal = array[0].indexFileProperties().ordinal();
+
+        assert oldProperties.ordinal() >= smallestOrdinal;
+
+        int updateIndex = oldProperties.ordinal() - smallestOrdinal;
+
+        if (updateIndex >= size) {
+            return this;
+        }
+
+        IndexFileMeta oldMeta = array[updateIndex];
+
+        assert oldMeta.indexFileProperties().equals(oldProperties)
+                : String.format("File properties mismatch [expected=%s, 
actual=%s].", oldMeta.indexFileProperties(), oldProperties);
+
+        if (updateIndex > 0) {
+            IndexFileMeta prevOldMeta = array[updateIndex - 1];
+
+            assert newMeta.firstLogIndexInclusive() == 
prevOldMeta.lastLogIndexExclusive() :
+                    String.format("Index File Metas must be contiguous. 
Expected log index: %d, actual log index: %d",
+                            prevOldMeta.lastLogIndexExclusive(),
+                            newMeta.firstLogIndexInclusive()
+                    );
+        }
+
+        if (updateIndex < size - 1) {
+            IndexFileMeta nextOldMeta = array[updateIndex + 1];
+
+            assert newMeta.lastLogIndexExclusive() == 
nextOldMeta.firstLogIndexInclusive() :
+                    String.format("Index File Metas must be contiguous. 
Expected log index: %d, actual log index: %d",
+                            nextOldMeta.firstLogIndexInclusive(),
+                            newMeta.lastLogIndexExclusive()
+                    );
+        }
+
+        IndexFileMeta[] newArray = array.clone();
+
+        newArray[updateIndex] = newMeta;
+
+        return new IndexFileMetaArray(newArray, size);
+    }
+
+    @Override
+    public String toString() {
+        return Arrays.toString(array);
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
new file mode 100644
index 00000000000..f4fc31cf654
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.lang.Math.toIntExact;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
+import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Garbage Collector for Raft log segment files.
+ *
+ * <p>The garbage collector performs compaction of segment files by removing 
truncated log entries and creating new generations
+ * of segment files. This process reclaims disk space occupied by log entries 
that have been truncated via
+ * {@link LogStorage#truncatePrefix} or {@link LogStorage#truncateSuffix} 
operations.
+ *
+ * <h2>Compaction Process</h2>
+ * When a segment file is selected for compaction, the GC:
+ * <ol>
+ *     <li>Copies non-truncated entries to a new segment file with an 
incremented generation number</li>
+ *     <li>Creates a new index file for the new generation</li>
+ *     <li>Atomically replaces the old segment file with the new one</li>
+ *     <li>Deletes the old segment file and its index file</li>
+ * </ol>
+ */
+class RaftLogGarbageCollector {
+    private static final IgniteLogger LOG = 
Loggers.forClass(RaftLogGarbageCollector.class);
+
+    private static final String TMP_FILE_SUFFIX = ".tmp";
+
+    private final Path segmentFilesDir;
+
+    private final IndexFileManager indexFileManager;
+
+    private final GroupInfoProvider groupInfoProvider;
+
+    private final AtomicLong logSize = new AtomicLong();
+
+    RaftLogGarbageCollector(
+            Path segmentFilesDir,
+            IndexFileManager indexFileManager,
+            GroupInfoProvider groupInfoProvider
+    ) {
+        this.segmentFilesDir = segmentFilesDir;
+        this.indexFileManager = indexFileManager;
+        this.groupInfoProvider = groupInfoProvider;
+    }
+
+    void cleanupLeftoverFiles() throws IOException {
+        FileProperties prevFileProperties = null;
+
+        try (Stream<Path> segmentFiles = Files.list(segmentFilesDir)) {
+            Iterator<Path> it = segmentFiles.sorted().iterator();
+
+            while (it.hasNext()) {
+                Path segmentFile = it.next();
+
+                if 
(segmentFile.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) {
+                    LOG.info("Deleting temporary segment file [path = {}].", 
segmentFile);
+
+                    Files.delete(segmentFile);
+                } else {
+                    FileProperties fileProperties = 
SegmentFile.fileProperties(segmentFile);
+
+                    if (prevFileProperties != null && 
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+                        Path prevPath = 
segmentFilesDir.resolve(SegmentFile.fileName(prevFileProperties));
+
+                        LOG.info("Deleting segment file because it has a 
higher generation version [path = {}].", prevPath);
+
+                        Files.delete(prevPath);
+                    }
+
+                    prevFileProperties = fileProperties;
+                }
+            }
+        }
+
+        // Do the same routine but for index files.
+        prevFileProperties = null;
+
+        try (Stream<Path> indexFiles = 
Files.list(indexFileManager.indexFilesDir())) {
+            Iterator<Path> it = indexFiles.sorted().iterator();
+
+            while (it.hasNext()) {
+                Path indexFile = it.next();
+
+                FileProperties fileProperties = indexFileProperties(indexFile);
+
+                // The GC does not create temporary index files, they are 
created by the index manager and are cleaned up by it.
+                if 
(!Files.exists(segmentFilesDir.resolve(SegmentFile.fileName(fileProperties)))) {
+                    LOG.info("Deleting index file because the corresponding 
segment file does not exist [path = {}].", indexFile);
+
+                    Files.delete(indexFile);
+                } else if (prevFileProperties != null && 
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+                    Path prevPath = 
indexFileManager.indexFilePath(prevFileProperties);
+
+                    LOG.info("Deleting index file because it has a higher 
generation version [path = {}].", prevPath);
+
+                    Files.deleteIfExists(prevPath);
+                }
+
+                prevFileProperties = fileProperties;
+            }
+        }
+    }
+
+    // TODO: Optimize compaction of completely truncated files, see 
https://issues.apache.org/jira/browse/IGNITE-27964.
+    @VisibleForTesting
+    void compactSegmentFile(SegmentFile segmentFile) throws IOException {
+        LOG.info("Compacting segment file [path = {}].", segmentFile.path());
+
+        // Cache for avoiding excessive min/max log index computations.
+        var logStorageInfos = new Long2ObjectOpenHashMap<GroupInfo>();
+
+        ByteBuffer buffer = segmentFile.buffer();
+
+        validateSegmentFileHeader(buffer, segmentFile.path());
+
+        TmpSegmentFile tmpSegmentFile = null;
+
+        WriteModeIndexMemTable tmpMemTable = null;
+
+        try {
+            while (!endOfSegmentReached(buffer)) {
+                int originalStartOfRecordOffset = buffer.position();
+
+                long groupId = buffer.getLong();
+
+                int payloadLength = buffer.getInt();
+
+                if (payloadLength <= 0) {
+                    // Skip special entries (such as truncation records). They 
can always be omitted.
+                    // To identify such entries we rely on the fact that they 
have nonpositive length field value.
+                    int endOfRecordOffset = buffer.position() + Long.BYTES + 
CRC_SIZE_BYTES;
+
+                    buffer.position(endOfRecordOffset);
+
+                    continue;
+                }
+
+                int endOfRecordOffset = buffer.position() + payloadLength + 
CRC_SIZE_BYTES;
+
+                long index = VarlenEncoder.readLong(buffer);
+
+                GroupInfo info = logStorageInfos.computeIfAbsent(groupId, 
groupInfoProvider::groupInfo);
+
+                if (info == null || index < info.firstLogIndexInclusive() || 
index >= info.lastLogIndexExclusive()) {
+                    // We found a truncated entry, it should be skipped.
+                    buffer.position(endOfRecordOffset);
+
+                    continue;
+                }
+
+                if (tmpSegmentFile == null) {
+                    tmpSegmentFile = new TmpSegmentFile(segmentFile);
+
+                    tmpSegmentFile.writeHeader();
+
+                    tmpMemTable = new SingleThreadMemTable();
+                }
+
+                int oldLimit = buffer.limit();
+
+                // Set the buffer boundaries to only write the current record 
to the new file.
+                
buffer.position(originalStartOfRecordOffset).limit(endOfRecordOffset);
+
+                @SuppressWarnings("resource")
+                long newStartOfRecordOffset = 
tmpSegmentFile.fileChannel().position();
+
+                writeFully(tmpSegmentFile.fileChannel(), buffer);
+
+                buffer.limit(oldLimit);
+
+                tmpMemTable.appendSegmentFileOffset(groupId, index, 
toIntExact(newStartOfRecordOffset));
+            }
+
+            Path indexFilePath = 
indexFileManager.indexFilePath(segmentFile.fileProperties());
+
+            long logSizeDelta;
+
+            if (tmpSegmentFile != null) {
+                tmpSegmentFile.syncAndRename();
+
+                // Create a new index file and update the in-memory state to 
point to it.
+                Path newIndexFilePath = indexFileManager.onIndexFileCompacted(
+                        tmpMemTable.transitionToReadMode(),
+                        segmentFile.fileProperties(),
+                        tmpSegmentFile.fileProperties()
+                );
+
+                logSizeDelta = Files.size(segmentFile.path())
+                        + Files.size(indexFilePath)
+                        - tmpSegmentFile.size()
+                        - Files.size(newIndexFilePath);
+            } else {
+                // We got lucky and the whole file can be removed.
+                logSizeDelta = Files.size(segmentFile.path()) + 
Files.size(indexFilePath);
+            }
+
+            // Remove the previous generation of the segment file and its 
index. This is safe to do, because we rely on the file system
+            // guarantees that other threads reading from the segment file 
will still be able to do that even if the file is deleted.
+            Files.delete(segmentFile.path());
+            Files.delete(indexFilePath);
+
+            long newLogSize = logSize.addAndGet(-logSizeDelta);
+
+            if (LOG.isInfoEnabled()) {
+                if (tmpSegmentFile == null) {
+                    LOG.info(
+                            "Segment file removed (all entries are truncated) 
[path = {}, log size freed = {}, new log size = {}].",
+                            segmentFile.path(), logSizeDelta, newLogSize
+                    );
+                } else {
+                    LOG.info(
+                            "Segment file compacted [path = {}, log size freed 
= {}, new log size = {}].",
+                            segmentFile.path(), logSizeDelta, newLogSize
+                    );
+                }
+            }
+        } finally {
+            if (tmpSegmentFile != null) {
+                tmpSegmentFile.close();
+            }
+        }
+    }
+
+    private static void writeFully(WritableByteChannel channel, ByteBuffer 
buffer) throws IOException {
+        while (buffer.hasRemaining()) {
+            channel.write(buffer);
+        }
+    }
+
+    private class TmpSegmentFile implements ManuallyCloseable {
+        private final String fileName;
+
+        private final Path tmpFilePath;
+
+        private final FileChannel fileChannel;
+
+        private final FileProperties fileProperties;
+
+        TmpSegmentFile(SegmentFile originalFile) throws IOException {
+            FileProperties originalFileProperties = 
originalFile.fileProperties();
+
+            this.fileProperties = new 
FileProperties(originalFileProperties.ordinal(), 
originalFileProperties.generation() + 1);
+            this.fileName = SegmentFile.fileName(fileProperties);
+            this.tmpFilePath = segmentFilesDir.resolve(fileName + 
TMP_FILE_SUFFIX);
+            this.fileChannel = FileChannel.open(tmpFilePath, CREATE_NEW, 
WRITE);
+        }
+
+        void writeHeader() throws IOException {
+            fileChannel.write(ByteBuffer.wrap(HEADER_RECORD));
+        }
+
+        FileChannel fileChannel() {
+            return fileChannel;
+        }
+
+        void syncAndRename() throws IOException {
+            fileChannel.force(true);
+
+            atomicMoveFile(tmpFilePath, tmpFilePath.resolveSibling(fileName), 
LOG);
+        }
+
+        long size() throws IOException {
+            return fileChannel.size();
+        }
+
+        FileProperties fileProperties() {
+            return fileProperties;
+        }
+
+        @Override
+        public void close() throws IOException {
+            fileChannel.close();
+        }
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
index 86ed55fac0b..d008ddb4ac0 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
@@ -77,9 +80,10 @@ class SegmentFile implements ManuallyCloseable {
     /** Lock used to atomically execute fsync. */
     private final Object syncLock = new Object();
 
-    private SegmentFile(RandomAccessFile file, Path path, boolean isSync) 
throws IOException {
-        //noinspection ChannelOpenedButNotSafelyClosed
-        buffer = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
+    private SegmentFile(FileChannel channel, Path path, boolean isSync) throws 
IOException {
+        buffer = channel.map(MapMode.READ_WRITE, 0, channel.size());
+
+        assert buffer.limit() > 0 : "File " + path + " is empty.";
 
         this.path = path;
         this.isSync = isSync;
@@ -97,20 +101,17 @@ class SegmentFile implements ManuallyCloseable {
             throw new IllegalArgumentException("File size is too big: " + 
fileSize);
         }
 
+        // Using the RandomAccessFile for its "setLength" method.
         try (var file = new RandomAccessFile(path.toFile(), "rw")) {
             file.setLength(fileSize);
 
-            return new SegmentFile(file, path, isSync);
+            return new SegmentFile(file.getChannel(), path, isSync);
         }
     }
 
     static SegmentFile openExisting(Path path, boolean isSync) throws 
IOException {
-        if (!Files.exists(path)) {
-            throw new IllegalArgumentException("File does not exist: " + path);
-        }
-
-        try (var file = new RandomAccessFile(path.toFile(), "rw")) {
-            return new SegmentFile(file, path, isSync);
+        try (var channel = FileChannel.open(path, READ, WRITE)) {
+            return new SegmentFile(channel, path, isSync);
         }
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 9f50c7598d0..cf1195a1621 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.raft.storage.segstore;
 import static java.lang.Math.toIntExact;
 import static 
org.apache.ignite.internal.raft.configuration.LogStorageConfigurationSchema.UNSPECIFIED_MAX_LOG_ENTRY_SIZE;
 import static 
org.apache.ignite.internal.raft.configuration.LogStorageConfigurationSchema.computeDefaultMaxLogEntrySizeBytes;
-import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.fileName;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.RESET_RECORD_SIZE;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE;
@@ -31,6 +30,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicReference;
@@ -129,6 +129,8 @@ class SegmentFileManager implements ManuallyCloseable {
 
     private final IndexFileManager indexFileManager;
 
+    private final RaftLogGarbageCollector garbageCollector;
+
     /** Configured size of a segment file. */
     private final int segmentFileSize;
 
@@ -157,6 +159,7 @@ class SegmentFileManager implements ManuallyCloseable {
             Path baseDir,
             int stripes,
             FailureProcessor failureProcessor,
+            GroupInfoProvider groupInfoProvider,
             RaftConfiguration raftConfiguration,
             LogStorageConfiguration storageConfiguration
     ) throws IOException {
@@ -180,14 +183,15 @@ class SegmentFileManager implements ManuallyCloseable {
                 failureProcessor,
                 logStorageView.maxCheckpointQueueSize()
         );
+
+        garbageCollector = new RaftLogGarbageCollector(segmentFilesDir, 
indexFileManager, groupInfoProvider);
     }
 
     void start() throws IOException {
         LOG.info("Starting segment file manager [segmentFilesDir={}, 
fileSize={}].", segmentFilesDir, segmentFileSize);
 
-        indexFileManager.cleanupTmpFiles();
-
-        var payloadParser = new SegmentPayloadParser();
+        indexFileManager.cleanupLeftoverFiles();
+        garbageCollector.cleanupLeftoverFiles();
 
         Path lastSegmentFilePath = null;
 
@@ -204,10 +208,11 @@ class SegmentFileManager implements ManuallyCloseable {
                     // Create missing index files.
                     FileProperties segmentFileProperties = 
SegmentFile.fileProperties(segmentFilePath);
 
-                    if 
(!indexFileManager.indexFileExists(segmentFileProperties)) {
+                    // TODO: we may want to load the file list into memory, 
see https://issues.apache.org/jira/browse/IGNITE-27961
+                    if 
(!Files.exists(indexFileManager.indexFilePath(segmentFileProperties))) {
                         LOG.info("Creating missing index file for segment file 
{}.", segmentFilePath);
 
-                        SegmentFileWithMemtable segmentFileWithMemtable = 
recoverSegmentFile(segmentFilePath, payloadParser);
+                        SegmentFileWithMemtable segmentFileWithMemtable = 
recoverSegmentFile(segmentFilePath);
 
                         
indexFileManager.recoverIndexFile(segmentFileWithMemtable.memtable().transitionToReadMode(),
 segmentFileProperties);
                     }
@@ -220,7 +225,7 @@ class SegmentFileManager implements ManuallyCloseable {
         } else {
             curSegmentFileOrdinal = 
SegmentFile.fileProperties(lastSegmentFilePath).ordinal();
 
-            
currentSegmentFile.set(recoverLatestSegmentFile(lastSegmentFilePath, 
payloadParser));
+            
currentSegmentFile.set(recoverLatestSegmentFile(lastSegmentFilePath));
         }
 
         LOG.info("Segment file manager recovery completed. Current segment 
file: {}.", lastSegmentFilePath);
@@ -244,8 +249,13 @@ class SegmentFileManager implements ManuallyCloseable {
         return indexFileManager;
     }
 
+    @TestOnly
+    RaftLogGarbageCollector garbageCollector() {
+        return garbageCollector;
+    }
+
     private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal) 
throws IOException {
-        Path path = segmentFilesDir.resolve(fileName(new 
FileProperties(fileOrdinal)));
+        Path path = segmentFilesDir.resolve(SegmentFile.fileName(new 
FileProperties(fileOrdinal)));
 
         SegmentFile segmentFile = SegmentFile.createNew(path, segmentFileSize, 
isSync);
 
@@ -259,12 +269,12 @@ class SegmentFileManager implements ManuallyCloseable {
      * "complete" segment files (i.e. those that have experienced a rollover) 
this method is expected to be called on the most recent,
      * possibly incomplete segment file.
      */
-    private SegmentFileWithMemtable recoverLatestSegmentFile(Path 
segmentFilePath, SegmentPayloadParser payloadParser) throws IOException {
+    private SegmentFileWithMemtable recoverLatestSegmentFile(Path 
segmentFilePath) throws IOException {
         SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath, 
isSync);
 
         var memTable = new StripedMemTable(stripes);
 
-        payloadParser.recoverMemtable(segmentFile, memTable, true);
+        SegmentPayloadParser.recoverMemtable(segmentFile, memTable, true);
 
         return new SegmentFileWithMemtable(segmentFile, memTable, false);
     }
@@ -276,12 +286,12 @@ class SegmentFileManager implements ManuallyCloseable {
      * <p>This method skips CRC validation, because it is used to identify the 
end of incomplete segment files (and, by definition, this can
      * never happen during this method's invocation), not to validate storage 
integrity.
      */
-    private SegmentFileWithMemtable recoverSegmentFile(Path segmentFilePath, 
SegmentPayloadParser payloadParser) throws IOException {
+    private SegmentFileWithMemtable recoverSegmentFile(Path segmentFilePath) 
throws IOException {
         SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath, 
isSync);
 
         var memTable = new SingleThreadMemTable();
 
-        payloadParser.recoverMemtable(segmentFile, memTable, false);
+        SegmentPayloadParser.recoverMemtable(segmentFile, memTable, false);
 
         return new SegmentFileWithMemtable(segmentFile, memTable, false);
     }
@@ -541,20 +551,29 @@ class SegmentFileManager implements ManuallyCloseable {
     }
 
     private EntrySearchResult readFromOtherSegmentFiles(long groupId, long 
logIndex) throws IOException {
-        SegmentFilePointer segmentFilePointer = 
indexFileManager.getSegmentFilePointer(groupId, logIndex);
+        while (true) {
+            SegmentFilePointer segmentFilePointer = 
indexFileManager.getSegmentFilePointer(groupId, logIndex);
 
-        if (segmentFilePointer == null) {
-            return EntrySearchResult.notFound();
-        }
+            if (segmentFilePointer == null) {
+                return EntrySearchResult.notFound();
+            }
 
-        Path path = 
segmentFilesDir.resolve(fileName(segmentFilePointer.fileProperties()));
+            Path path = 
segmentFilesDir.resolve(SegmentFile.fileName(segmentFilePointer.fileProperties()));
 
-        // TODO: Add a cache for recently accessed segment files, see 
https://issues.apache.org/jira/browse/IGNITE-26622.
-        SegmentFile segmentFile = SegmentFile.openExisting(path, isSync);
+            // TODO: Add a cache for recently accessed segment files, see 
https://issues.apache.org/jira/browse/IGNITE-26622.
+            try {
+                SegmentFile segmentFile = SegmentFile.openExisting(path, 
isSync);
 
-        ByteBuffer buffer = 
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
+                ByteBuffer buffer = 
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
 
-        return EntrySearchResult.success(buffer);
+                return EntrySearchResult.success(buffer);
+            } catch (NoSuchFileException e) {
+                // When reading from a segment file based on information from 
the index manager, there exists a race with the Garbage
+                // Collector: index manager can return a pointer to a segment 
file that may have been compacted. In this case, we should
+                // just retry and get more recent information.
+                LOG.info("Segment file {} not found, retrying.", path);
+            }
+        }
     }
 
     private static int maxLogEntrySize(LogStorageView storageConfiguration) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
index 76e9b7477c6..e34bae493b8 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.raft.util.VarlenEncoder;
 import org.apache.ignite.internal.util.FastCrc;
 
 class SegmentPayloadParser {
-    void recoverMemtable(SegmentFile segmentFile, WriteModeIndexMemTable 
memtable, boolean validateCrc) {
+    static void recoverMemtable(SegmentFile segmentFile, 
WriteModeIndexMemTable memtable, boolean validateCrc) {
         ByteBuffer buffer = segmentFile.buffer();
 
         validateSegmentFileHeader(buffer, segmentFile.path());
@@ -104,7 +104,7 @@ class SegmentPayloadParser {
         }
     }
 
-    private static void validateSegmentFileHeader(ByteBuffer buffer, Path 
segmentFilePath) {
+    static void validateSegmentFileHeader(ByteBuffer buffer, Path 
segmentFilePath) {
         int magicNumber = buffer.getInt();
 
         if (magicNumber != MAGIC_NUMBER) {
@@ -132,7 +132,7 @@ class SegmentPayloadParser {
         return crc == expectedCrc;
     }
 
-    private static boolean endOfSegmentReached(ByteBuffer buffer) {
+    static boolean endOfSegmentReached(ByteBuffer buffer) {
         if (buffer.remaining() < SWITCH_SEGMENT_RECORD.length) {
             return true;
         }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
index 797e545d2a4..235e2e2063b 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.raft.storage.segstore;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 import org.apache.ignite.internal.lang.RunnableX;
@@ -277,4 +280,80 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
         assertThat(groupMeta.indexMeta(19), is(nullValue()));
         assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
     }
+
+    @Test
+    void testOnIndexCompacted() {
+        var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+        var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+        var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+        var groupMeta = new GroupIndexMeta(meta1);
+        groupMeta.addIndexMeta(meta2);
+        groupMeta.addIndexMeta(meta3);
+
+        var compactedMeta2 = new IndexFileMeta(50, 100, 42, new 
FileProperties(1, 1));
+        groupMeta.onIndexCompacted(new FileProperties(1), compactedMeta2);
+
+        assertThat(groupMeta.indexMeta(1), is(meta1));
+        assertThat(groupMeta.indexMeta(50), is(compactedMeta2));
+        assertThat(groupMeta.indexMeta(100), is(meta3));
+    }
+
+    @Test
+    void testOnIndexCompactedWithMultipleBlocks() {
+        // meta1 is in block 0.
+        var meta1 = new IndexFileMeta(1, 100, 0, new FileProperties(0));
+        var groupMeta = new GroupIndexMeta(meta1);
+
+        // meta2 overlaps meta1, creating a second block in the deque.
+        var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(1));
+        groupMeta.addIndexMeta(meta2);
+
+        // meta3 is added to the second block (consecutive to meta2).
+        var meta3 = new IndexFileMeta(100, 200, 84, new FileProperties(2));
+        groupMeta.addIndexMeta(meta3);
+
+        // Compact meta1 from the older block.
+        var compactedMeta1 = new IndexFileMeta(1, 100, 0, new 
FileProperties(0, 1));
+        groupMeta.onIndexCompacted(new FileProperties(0), compactedMeta1);
+
+        assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
+        assertThat(groupMeta.indexMeta(42), is(meta2));
+        assertThat(groupMeta.indexMeta(100), is(meta3));
+
+        // Compact meta3 from the newer block.
+        var compactedMeta3 = new IndexFileMeta(100, 200, 84, new 
FileProperties(2, 1));
+        groupMeta.onIndexCompacted(new FileProperties(2), compactedMeta3);
+
+        assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
+        assertThat(groupMeta.indexMeta(42), is(meta2));
+        assertThat(groupMeta.indexMeta(100), is(compactedMeta3));
+    }
+
+    @RepeatedTest(100)
+    void multithreadCompactionWithTruncatePrefix() {
+        var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+        var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(1));
+        var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+        var compactedMeta2 = new IndexFileMeta(42, 100, 42, new 
FileProperties(1, 1));
+
+        var groupMeta = new GroupIndexMeta(meta1);
+        groupMeta.addIndexMeta(meta2);
+        groupMeta.addIndexMeta(meta3);
+
+        RunnableX compactionTask = () -> groupMeta.onIndexCompacted(new 
FileProperties(1), compactedMeta2);
+
+        RunnableX truncateTask = () -> groupMeta.truncatePrefix(43);
+
+        RunnableX readTask = () -> {
+            IndexFileMeta indexFileMeta = groupMeta.indexMeta(51);
+
+            assertThat(indexFileMeta, is(notNullValue()));
+            assertThat(indexFileMeta.firstLogIndexInclusive(), 
is(anyOf(equalTo(42L), equalTo(43L))));
+            assertThat(indexFileMeta.indexFileProperties().generation(), 
is(anyOf(equalTo(0), equalTo(1))));
+        };
+
+        runRace(compactionTask, truncateTask, readTask);
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index d46dc5c7c39..6da3b7067a8 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -23,11 +23,13 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 class IndexFileManagerTest extends IgniteAbstractTest {
@@ -379,8 +381,8 @@ class IndexFileManagerTest extends IgniteAbstractTest {
 
     @Test
     void testExists() throws IOException {
-        assertThat(indexFileManager.indexFileExists(new FileProperties(0)), 
is(false));
-        assertThat(indexFileManager.indexFileExists(new FileProperties(1)), 
is(false));
+        assertThat(Files.exists(indexFileManager.indexFilePath(new 
FileProperties(0))), is(false));
+        assertThat(Files.exists(indexFileManager.indexFilePath(new 
FileProperties(1))), is(false));
 
         var memtable = new StripedMemTable(STRIPES);
 
@@ -388,13 +390,13 @@ class IndexFileManagerTest extends IgniteAbstractTest {
 
         indexFileManager.saveNewIndexMemtable(memtable);
 
-        assertThat(indexFileManager.indexFileExists(new FileProperties(0)), 
is(true));
-        assertThat(indexFileManager.indexFileExists(new FileProperties(1)), 
is(false));
+        assertThat(Files.exists(indexFileManager.indexFilePath(new 
FileProperties(0))), is(true));
+        assertThat(Files.exists(indexFileManager.indexFilePath(new 
FileProperties(1))), is(false));
 
         indexFileManager.saveNewIndexMemtable(memtable);
 
-        assertThat(indexFileManager.indexFileExists(new FileProperties(0)), 
is(true));
-        assertThat(indexFileManager.indexFileExists(new FileProperties(1)), 
is(true));
+        assertThat(Files.exists(indexFileManager.indexFilePath(new 
FileProperties(0))), is(true));
+        assertThat(Files.exists(indexFileManager.indexFilePath(new 
FileProperties(1))), is(true));
     }
 
     @Test
@@ -556,4 +558,56 @@ class IndexFileManagerTest extends IgniteAbstractTest {
         assertThat(indexFileManager.firstLogIndexInclusive(0), is(2L));
         assertThat(indexFileManager.lastLogIndexExclusive(0), is(3L));
     }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27980";)
+    @Test
+    void testCompactionWithMissingGroups() throws IOException {
+        var memtable = new SingleThreadMemTable();
+
+        // First file contains entries from two groups.
+        memtable.appendSegmentFileOffset(0, 1, 1);
+        memtable.appendSegmentFileOffset(1, 1, 1);
+
+        indexFileManager.saveNewIndexMemtable(memtable);
+
+        memtable = new SingleThreadMemTable();
+
+        // Second file contains entries only from the first group.
+        memtable.appendSegmentFileOffset(0, 2, 2);
+
+        indexFileManager.saveNewIndexMemtable(memtable);
+
+        memtable = new SingleThreadMemTable();
+
+        // Third file contains entries from two groups again.
+        memtable.appendSegmentFileOffset(0, 3, 3);
+        memtable.appendSegmentFileOffset(0, 4, 4);
+        memtable.appendSegmentFileOffset(1, 2, 2);
+
+        indexFileManager.saveNewIndexMemtable(memtable);
+
+        // Truncate prefix of the group 0 so that one of the entries from the 
third file are removed.
+        memtable = new SingleThreadMemTable();
+
+        memtable.truncatePrefix(0, 4);
+
+        indexFileManager.saveNewIndexMemtable(memtable);
+
+        var compactedMemtable = new SingleThreadMemTable();
+
+        // We are removing an entry for group 0 from the third file.
+        compactedMemtable.appendSegmentFileOffset(1, 2, 2);
+
+        indexFileManager.onIndexFileCompacted(compactedMemtable, new 
FileProperties(2, 0), new FileProperties(2, 1));
+
+        assertThat(
+                indexFileManager.getSegmentFilePointer(0, 3),
+                is(nullValue())
+        );
+
+        assertThat(
+                indexFileManager.getSegmentFilePointer(1, 2),
+                is(new SegmentFilePointer(new FileProperties(2, 1), 2))
+        );
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
new file mode 100644
index 00000000000..69b6e0c9f95
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import 
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link RaftLogGarbageCollector}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(MockitoExtension.class)
+class RaftLogGarbageCollectorTest extends IgniteAbstractTest {
+    private static final int FILE_SIZE = 200;
+
+    private static final long GROUP_ID_1 = 1000;
+
+    private static final long GROUP_ID_2 = 2000;
+
+    private static final int STRIPES = 10;
+
+    private static final String NODE_NAME = "test";
+
+    @InjectConfiguration
+    private RaftConfiguration raftConfiguration;
+
+    @InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
+    private LogStorageConfiguration storageConfiguration;
+
+    private SegmentFileManager fileManager;
+
+    private IndexFileManager indexFileManager;
+
+    private RaftLogGarbageCollector garbageCollector;
+
+    @Mock
+    private GroupInfoProvider groupInfoProvider;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        fileManager = new SegmentFileManager(
+                NODE_NAME,
+                workDir,
+                STRIPES,
+                new NoOpFailureManager(),
+                groupInfoProvider,
+                raftConfiguration,
+                storageConfiguration
+        );
+
+        fileManager.start();
+
+        indexFileManager = fileManager.indexFileManager();
+
+        garbageCollector = fileManager.garbageCollector();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (fileManager != null) {
+            fileManager.close();
+        }
+    }
+
+    @Test
+    void testCompactSegmentFileWithAllEntriesTruncated() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+        // This is equivalent to prefix truncation up to the latest index.
+        when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new 
GroupInfo(batches.size() - 1, batches.size() - 1));
+
+        List<Path> segmentFiles = segmentFiles();
+
+        Path segmentFilePath = segmentFiles.get(0);
+
+        FileProperties fileProperties = 
SegmentFile.fileProperties(segmentFilePath);
+
+        SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath, 
false);
+        try {
+            garbageCollector.compactSegmentFile(segmentFile);
+        } finally {
+            segmentFile.close();
+        }
+
+        assertThat(Files.exists(segmentFilePath), is(false));
+        
assertThat(Files.exists(indexFileManager.indexFilePath(fileProperties)), 
is(false));
+
+        // Validate that no files with increased generation have been created.
+        var newFileProperties = new FileProperties(fileProperties.ordinal(), 
fileProperties.generation() + 1);
+
+        assertThat(segmentFiles(), hasSize(segmentFiles.size() - 1));
+        
assertThat(Files.exists(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties))),
 is(false));
+        
assertThat(Files.exists(indexFileManager.indexFilePath(newFileProperties)), 
is(false));
+    }
+
+    @Test
+    void testCompactSegmentFileWithSomeEntriesTruncated() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 8, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+            appendBytes(GROUP_ID_2, batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+        List<Path> segmentFiles = segmentFiles();
+
+        long originalSize = Files.size(segmentFiles.get(0));
+
+        // Emulate truncation of one group
+        when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new 
GroupInfo(batches.size() - 1, batches.size() - 1));
+        when(groupInfoProvider.groupInfo(GROUP_ID_2)).thenReturn(new 
GroupInfo(1, batches.size() - 1));
+
+        Path firstSegmentFile = segmentFiles.get(0);
+
+        FileProperties originalFileProperties = 
SegmentFile.fileProperties(firstSegmentFile);
+
+        SegmentFile segmentFile = SegmentFile.openExisting(firstSegmentFile, 
false);
+        try {
+            garbageCollector.compactSegmentFile(segmentFile);
+        } finally {
+            segmentFile.close();
+        }
+
+        assertThat(Files.exists(firstSegmentFile), is(false));
+
+        var newFileProperties = new 
FileProperties(originalFileProperties.ordinal(), 
originalFileProperties.generation() + 1);
+
+        Path newSegmentFile = 
fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties));
+
+        assertThat(Files.exists(newSegmentFile), is(true));
+
+        assertThat(Files.size(newSegmentFile), lessThan(originalSize));
+
+        
assertThat(Files.exists(indexFileManager.indexFilePath(newFileProperties)), 
is(true));
+        
assertThat(Files.exists(indexFileManager.indexFilePath(originalFileProperties)),
 is(false));
+    }
+
+    @Test
+    void testCompactSegmentFileWithTruncationRecords() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+        }
+
+        fileManager.truncatePrefix(GROUP_ID_1, 2);
+        fileManager.truncateSuffix(GROUP_ID_1, 3);
+
+        await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+        List<Path> segmentFiles = segmentFiles();
+        assertThat(segmentFiles, hasSize(greaterThan(1)));
+
+        // This is equivalent to prefix truncation up to the latest index.
+        when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new 
GroupInfo(batches.size() - 1, batches.size() - 1));
+
+        Path firstSegmentFile = segmentFiles.get(0);
+
+        SegmentFile segmentFile = SegmentFile.openExisting(firstSegmentFile, 
false);
+        try {
+            garbageCollector.compactSegmentFile(segmentFile);
+        } finally {
+            segmentFile.close();
+        }
+
+        assertThat(Files.exists(firstSegmentFile), is(false));
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27964";)
+    @RepeatedTest(10)
+    void testConcurrentCompactionAndReads() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 10, 50);
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() 
- 1)));
+
+        var firstAliveIndex = new AtomicLong();
+        var gcTaskDone = new AtomicBoolean(false);
+
+        when(groupInfoProvider.groupInfo(GROUP_ID_1))
+                .thenAnswer(invocationOnMock -> new 
GroupInfo(firstAliveIndex.get(), batches.size() - 1));
+
+        RunnableX gcTask = () -> {
+            try {
+                List<Path> segmentFiles = segmentFiles();
+
+                Path lastSegmentFile = segmentFiles.get(segmentFiles.size() - 
1);
+
+                // Don't compact the last segment file.
+                while (!segmentFiles.get(0).equals(lastSegmentFile)) {
+                    long index = firstAliveIndex.incrementAndGet();
+
+                    fileManager.truncatePrefix(GROUP_ID_1, index);
+
+                    SegmentFile segmentFile = 
SegmentFile.openExisting(segmentFiles.get(0), false);
+                    try {
+                        garbageCollector.compactSegmentFile(segmentFile);
+                    } finally {
+                        segmentFile.close();
+                    }
+
+                    segmentFiles = segmentFiles();
+                }
+            } finally {
+                gcTaskDone.set(true);
+            }
+        };
+
+        RunnableX readTask = () -> {
+            while (!gcTaskDone.get()) {
+                for (int i = 0; i < batches.size(); i++) {
+                    int index = i;
+
+                    fileManager.getEntry(GROUP_ID_1, i, bs -> {
+                        if (bs != null) {
+                            assertThat(bs, is(batches.get(index)));
+                        }
+                        return null;
+                    });
+                }
+            }
+        };
+
+        runRace(gcTask, readTask, readTask, readTask);
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27964";)
+    @RepeatedTest(10)
+    void testConcurrentCompactionAndReadsFromMultipleGroups() throws Exception 
{
+        List<byte[]> batches = createRandomData(FILE_SIZE / 10, 50);
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+            appendBytes(GROUP_ID_2, batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size() 
- 1)));
+
+        var firstAliveIndex = new AtomicLong();
+        var gcTaskDone = new AtomicBoolean(false);
+
+        when(groupInfoProvider.groupInfo(GROUP_ID_1))
+                .thenAnswer(invocationOnMock -> new 
GroupInfo(firstAliveIndex.get(), batches.size() - 1));
+
+        // Group 2 is never compacted.
+        when(groupInfoProvider.groupInfo(GROUP_ID_2))
+                .thenAnswer(invocationOnMock -> new GroupInfo(0, 
batches.size() - 1));
+
+        RunnableX gcTask = () -> {
+            try {
+                List<Path> segmentFiles = segmentFiles();
+
+                Path curSegmentFilePath = segmentFiles.get(0);
+
+                // Unlike in the similar test, segment files will never be 
removed completely due to the presence of the second group.
+                // Because of that we need to use more complex logic to 
iterate over the segment files.
+                int segmentFilesIndex = 0;
+
+                while (true) {
+                    long index = firstAliveIndex.incrementAndGet();
+
+                    fileManager.truncatePrefix(GROUP_ID_1, index);
+
+                    FileProperties fileProperties = 
SegmentFile.fileProperties(curSegmentFilePath);
+
+                    long sizeBeforeCompaction = Files.size(curSegmentFilePath);
+
+                    SegmentFile segmentFile = 
SegmentFile.openExisting(curSegmentFilePath, false);
+                    try {
+                        garbageCollector.compactSegmentFile(segmentFile);
+                    } finally {
+                        segmentFile.close();
+                    }
+
+                    FileProperties newFileProperties = new 
FileProperties(fileProperties.ordinal(), fileProperties.generation() + 1);
+
+                    curSegmentFilePath = 
fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties));
+
+                    long sizeAfterCompaction = Files.size(curSegmentFilePath);
+
+                    // If the files' size didn't change, there's nothing left 
to compact, we can switch to the next segment.
+                    if (sizeAfterCompaction == sizeBeforeCompaction) {
+                        segmentFilesIndex++;
+
+                        // Don't compact the last segment file.
+                        if (segmentFilesIndex == segmentFiles.size() - 1) {
+                            break;
+                        }
+
+                        curSegmentFilePath = 
segmentFiles.get(segmentFilesIndex);
+                    }
+                }
+            } finally {
+                gcTaskDone.set(true);
+            }
+        };
+
+        RunnableX readTaskFromGroup1 = () -> {
+            while (!gcTaskDone.get()) {
+                for (int i = 0; i < batches.size(); i++) {
+                    int index = i;
+
+                    fileManager.getEntry(GROUP_ID_1, i, bs -> {
+                        if (bs != null) {
+                            assertThat(bs, is(batches.get(index)));
+                        }
+                        return null;
+                    });
+                }
+            }
+        };
+
+        RunnableX readTaskFromGroup2 = () -> {
+            while (!gcTaskDone.get()) {
+                for (int i = 0; i < batches.size(); i++) {
+                    int index = i;
+
+                    fileManager.getEntry(GROUP_ID_2, i, bs -> {
+                        if (bs != null) {
+                            assertThat(bs, is(batches.get(index)));
+                        }
+                        return null;
+                    });
+                }
+            }
+        };
+
+        runRace(gcTask, readTaskFromGroup1, readTaskFromGroup1, 
readTaskFromGroup2, readTaskFromGroup2);
+    }
+
+    @Test
+    void testRecoveryAfterCompaction() throws Exception {
+        List<byte[]> batches = createRandomData(FILE_SIZE / 8, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(GROUP_ID_1, batches.get(i), i);
+            appendBytes(GROUP_ID_2, batches.get(i), i);
+        }
+
+        fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
+
+        await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+        List<Path> segmentFiles = segmentFiles();
+
+        // Emulate truncation of one group
+        when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new 
GroupInfo(batches.size() - 1, batches.size() - 1));
+        when(groupInfoProvider.groupInfo(GROUP_ID_2)).thenReturn(new 
GroupInfo(1, batches.size() - 1));
+
+        SegmentFile segmentFile = 
SegmentFile.openExisting(segmentFiles.get(0), false);
+        try {
+            garbageCollector.compactSegmentFile(segmentFile);
+        } finally {
+            segmentFile.close();
+        }
+
+        fileManager.close();
+
+        fileManager = new SegmentFileManager(
+                NODE_NAME,
+                workDir,
+                STRIPES,
+                new NoOpFailureManager(),
+                groupInfoProvider,
+                raftConfiguration,
+                storageConfiguration
+        );
+
+        fileManager.start();
+
+        for (int i = 0; i < batches.size(); i++) {
+            int index = i;
+
+            fileManager.getEntry(GROUP_ID_1, i, bs -> {
+                assertThat(index, is(batches.size() - 1));
+                assertThat(bs, is(batches.get(index)));
+
+                return null;
+            });
+
+            fileManager.getEntry(GROUP_ID_2, i, bs -> {
+                assertThat(bs, is(batches.get(index)));
+                return null;
+            });
+        }
+    }
+
+    @Test
+    void testCleanupLeftoverFilesOnRecovery() throws Exception {
+        // Create temporary segment files
+        Path tmpFile1 = 
fileManager.segmentFilesDir().resolve("segment-0000000001-0000000000.bin.tmp");
+        Path tmpFile2 = 
fileManager.segmentFilesDir().resolve("segment-0000000002-0000000001.bin.tmp");
+
+        Files.createFile(tmpFile1);
+        Files.createFile(tmpFile2);
+
+        // Create duplicate segment and index files with same ordinal but 
different generations
+        Path segmentFile1Gen0 = 
fileManager.segmentFilesDir().resolve("segment-0000000003-0000000000.bin");
+        Path segmentFile1Gen1 = 
fileManager.segmentFilesDir().resolve("segment-0000000003-0000000001.bin");
+        Path indexFile1Gen0 = 
fileManager.indexFileManager().indexFilesDir().resolve("index-0000000003-0000000000.bin");
+        Path indexFile1Gen1 = 
fileManager.indexFileManager().indexFilesDir().resolve("index-0000000003-0000000001.bin");
+
+        Files.createFile(segmentFile1Gen0);
+        Files.createFile(segmentFile1Gen1);
+        Files.createFile(indexFile1Gen0);
+        Files.createFile(indexFile1Gen1);
+
+        // Create orphaned index files (no corresponding segment file)
+        Path orphanedIndexFile = 
fileManager.indexFileManager().indexFilesDir().resolve("index-0000000099-0000000000.bin");
+
+        Files.createFile(orphanedIndexFile);
+
+        fileManager.close();
+
+        fileManager = new SegmentFileManager(
+                NODE_NAME,
+                workDir,
+                STRIPES,
+                new NoOpFailureManager(),
+                groupInfoProvider,
+                raftConfiguration,
+                storageConfiguration
+        );
+
+        fileManager.garbageCollector().cleanupLeftoverFiles();
+
+        // Verify temporary files are cleaned up
+        assertFalse(Files.exists(tmpFile1));
+        assertFalse(Files.exists(tmpFile2));
+
+        // Verify duplicate segment files are cleaned up (lower generation 
removed)
+        assertFalse(Files.exists(segmentFile1Gen0));
+        assertTrue(Files.exists(segmentFile1Gen1));
+        assertFalse(Files.exists(indexFile1Gen0));
+        assertTrue(Files.exists(indexFile1Gen1));
+
+        // Verify orphaned index files are cleaned up
+        assertFalse(Files.exists(orphanedIndexFile));
+    }
+
+    private List<Path> segmentFiles() throws IOException {
+        try (Stream<Path> files = Files.list(fileManager.segmentFilesDir())) {
+            return files
+                    .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                    .sorted()
+                    .collect(Collectors.toList());
+        }
+    }
+
+    private List<Path> indexFiles() throws IOException {
+        try (Stream<Path> files = Files.list(fileManager.indexFilesDir())) {
+            return files
+                    .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+                    .sorted()
+                    .collect(Collectors.toList());
+        }
+    }
+
+    private static List<byte[]> createRandomData(int batchLength, int 
numBatches) {
+        return IntStream.range(0, numBatches)
+                .mapToObj(i -> randomBytes(ThreadLocalRandom.current(), 
batchLength))
+                .collect(Collectors.toList());
+    }
+
+    private void appendBytes(long groupId, byte[] serializedEntry, long index) 
throws IOException {
+        var entry = new LogEntry();
+        entry.setId(new LogId(index, 0));
+
+        fileManager.appendEntry(groupId, entry, new LogEntryEncoder() {
+            @Override
+            public byte[] encode(LogEntry log) {
+                return serializedEntry;
+            }
+
+            @Override
+            public void encode(ByteBuffer buffer, LogEntry log) {
+                buffer.put(serializedEntry);
+            }
+
+            @Override
+            public int size(LogEntry logEntry) {
+                return serializedEntry.length;
+            }
+        });
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
index 3debc97b513..27b182a7124 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
@@ -87,6 +87,7 @@ class SegmentFileManagerGetEntryTest extends 
IgniteAbstractTest {
                 workDir,
                 STRIPES,
                 new NoOpFailureManager(),
+                groupId -> null,
                 raftConfiguration,
                 storageConfiguration
         );
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 9bf3099fb7e..a11a96588db 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static java.util.Collections.emptyIterator;
 import static java.util.Comparator.comparingLong;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.runAsync;
@@ -118,6 +119,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
                 workDir,
                 STRIPES,
                 failureManager,
+                GroupInfoProvider.NO_OP,
                 raftConfiguration,
                 storageConfiguration
         );
@@ -489,7 +491,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
         // Use a mock memtable that throws an exception to force the index 
manager to create a temporary index file, but not rename it.
         ReadModeIndexMemTable mockMemTable = mock(ReadModeIndexMemTable.class);
 
-        when(mockMemTable.iterator()).thenThrow(new RuntimeException("Test 
exception"));
+        when(mockMemTable.iterator())
+                .thenReturn(emptyIterator())
+                .thenThrow(new RuntimeException("Test exception"));
 
         // Create a tmp file for the incomplete segment file.
         try {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
index bd053f0468d..60cfafe065c 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
@@ -112,7 +113,7 @@ class SegmentFileTest extends IgniteAbstractTest {
 
     @Test
     void testOpenExistingConstructorInvariants() throws IOException {
-        assertThrows(IllegalArgumentException.class, () -> 
SegmentFile.openExisting(path, false));
+        assertThrows(NoSuchFileException.class, () -> 
SegmentFile.openExisting(path, false));
 
         createSegmentFile(1);
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index 98053c13787..ca7ce4c605d 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -61,6 +61,7 @@ class SegstoreLogStorageConcurrencyTest extends 
IgniteAbstractTest {
                 workDir,
                 1,
                 new NoOpFailureManager(),
+                GroupInfoProvider.NO_OP,
                 raftConfiguration,
                 storageConfiguration
         );
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index c058d46078d..9009e122163 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -64,6 +64,7 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
                     path,
                     1,
                     new NoOpFailureManager(),
+                    GroupInfoProvider.NO_OP,
                     raftConfiguration,
                     storageConfiguration
             );

Reply via email to