This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e1b9752ac83 IGNITE-26285 Implement truncatePrefix operation (#7101)
e1b9752ac83 is described below

commit e1b9752ac8371769a475774838fa427adbbbbf8c
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Dec 8 18:23:31 2025 +0200

    IGNITE-26285 Implement truncatePrefix operation (#7101)
---
 .../raft/storage/segstore/EntrySearchResult.java   |  87 +++++++++++
 .../raft/storage/segstore/GroupIndexMeta.java      |  74 +++++++++-
 .../raft/storage/segstore/IndexFileManager.java    |  95 +++++++++---
 .../raft/storage/segstore/IndexFileMeta.java       |   3 +
 .../raft/storage/segstore/IndexFileMetaArray.java  |  55 ++++++-
 .../raft/storage/segstore/IndexMemTable.java       |  53 +++++--
 .../raft/storage/segstore/RaftLogCheckpointer.java |  30 ++--
 .../raft/storage/segstore/SegmentFileManager.java  | 160 +++++++++++++--------
 .../raft/storage/segstore/SegmentInfo.java         | 149 ++++++++++++++++---
 .../raft/storage/segstore/SegmentPayload.java      |  48 ++++---
 .../raft/storage/segstore/SegstoreLogStorage.java  |  10 +-
 .../storage/segstore/WriteModeIndexMemTable.java   |   5 +
 .../raft/storage/segstore/CheckpointQueueTest.java |   8 +-
 .../segstore/DeserializedSegmentPayload.java       |   8 +-
 .../raft/storage/segstore/GroupIndexMetaTest.java  |  85 +++++++++--
 .../storage/segstore/IndexFileManagerTest.java     | 116 ++++++++++++++-
 .../storage/segstore/IndexFileMetaArrayTest.java   |  39 +++++
 .../raft/storage/segstore/IndexMemTableTest.java   | 149 ++++++++++++++++---
 .../storage/segstore/RaftLogCheckpointerTest.java  |  78 ++++++++--
 .../storage/segstore/SegmentFileManagerTest.java   |  94 ++++++++++--
 .../SegstoreLogStorageConcurrencyTest.java         |  40 ++++++
 .../storage/segstore/SegstoreLogStorageTest.java   |  31 +++-
 22 files changed, 1204 insertions(+), 213 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java
new file mode 100644
index 00000000000..0aa3f44c49a
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static 
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome.CONTINUE_SEARCH;
+import static 
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome.NOT_FOUND;
+import static 
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome.SUCCESS;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class representing the result of an entry search in the log storage.
+ *
+ * <p>It is used to represent three search outcomes:
+ *
+ * <ol>
+ *     <li>If {@link EntrySearchResult#searchOutcome()} is {@link 
SearchOutcome#CONTINUE_SEARCH}, then the entry was not found and we should
+ *     continue looking in other places;</li>
+ *     <li>If {@link EntrySearchResult#searchOutcome()} is {@link 
SearchOutcome#NOT_FOUND}, then the entry was not found and we know for
+ *     sure that it does not exist in the storage;</li>
+ *     <li>If {@link EntrySearchResult#searchOutcome()} is {@link 
SearchOutcome#SUCCESS}, then the corresponding entry has been found
+ *     successfully and {@link EntrySearchResult#entryBuffer()} method can be 
used to obtain the entry value.</li>
+ * </ol>
+ */
+class EntrySearchResult {
+    enum SearchOutcome {
+        SUCCESS, NOT_FOUND, CONTINUE_SEARCH
+    }
+
+    private static final EntrySearchResult NOT_FOUND_RESULT = new 
EntrySearchResult(null, NOT_FOUND);
+
+    private static final EntrySearchResult CONTINUE_SEARCH_RESULT = new 
EntrySearchResult(null, CONTINUE_SEARCH);
+
+    @Nullable
+    private final ByteBuffer entryBuffer;
+
+    private final SearchOutcome searchOutcome;
+
+    private EntrySearchResult(@Nullable ByteBuffer entryBuffer, SearchOutcome 
searchOutcome) {
+        this.entryBuffer = entryBuffer;
+        this.searchOutcome = searchOutcome;
+    }
+
+    ByteBuffer entryBuffer() {
+        assert entryBuffer != null : "Search result is empty";
+
+        return entryBuffer;
+    }
+
+    SearchOutcome searchOutcome() {
+        return searchOutcome;
+    }
+
+    static EntrySearchResult success(ByteBuffer entryBuffer) {
+        return new EntrySearchResult(entryBuffer, SUCCESS);
+    }
+
+    static EntrySearchResult notFound() {
+        return NOT_FOUND_RESULT;
+    }
+
+    static EntrySearchResult continueSearch() {
+        return CONTINUE_SEARCH_RESULT;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index a727216a180..912fedb951a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -49,8 +49,27 @@ class GroupIndexMeta {
         void addIndexMeta(IndexFileMeta indexFileMeta) {
             IndexFileMetaArray fileMetas = this.fileMetas;
 
-            IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+            setFileMetas(fileMetas, fileMetas.add(indexFileMeta));
+        }
+
+        long firstLogIndexInclusive() {
+            return fileMetas.firstLogIndexInclusive();
+        }
+
+        long lastLogIndexExclusive() {
+            return fileMetas.lastLogIndexExclusive();
+        }
+
+        /**
+         * Removes all metas which log indices are smaller than the given 
value.
+         */
+        void truncateIndicesSmallerThan(long firstLogIndexKept) {
+            IndexFileMetaArray fileMetas = this.fileMetas;
+
+            setFileMetas(fileMetas, 
fileMetas.truncateIndicesSmallerThan(firstLogIndexKept));
+        }
 
+        private void setFileMetas(IndexFileMetaArray fileMetas, 
IndexFileMetaArray newFileMetas) {
             // Simple assignment would suffice, since we only have one thread 
writing to this field, but we use compareAndSet to verify
             // this invariant, just in case.
             boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas);
@@ -59,6 +78,19 @@ class GroupIndexMeta {
         }
     }
 
+    /**
+     * A deque of index file meta blocks.
+     *
+     * <p>When a new index file is created, its meta is appended to the last 
block (represented by a {@link IndexMetaArrayHolder}) of the
+     * deque if its {@link IndexFileMeta#firstLogIndexInclusive()} matches the 
most recent {@link IndexFileMeta#lastLogIndexExclusive()} in
+     * the block. I.e. consecutive index file metas are merged into a single 
block, no new elements are added to the deque.
+     *
+     * <p>The only case when a new block is added to the deque is if a log 
suffix truncation happened somewhere during an index file's
+     * lifecycle. In this case, the new block will have its {@link 
IndexFileMeta#firstLogIndexInclusive()} smaller than the most recent
+     * block's {@link IndexFileMeta#lastLogIndexExclusive()} (two blocks 
"overlap"). During search, the newer block will be checked first,
+     * so elements in the range {@code [newBlock#firstLogIndexInclusive : 
oldBlock#lastLogIndexExclusive)} will be taken from the new block,
+     * effectively overriding the old block's entries in this range.
+     */
     private final Deque<IndexMetaArrayHolder> fileMetaDeque = new 
ConcurrentLinkedDeque<>();
 
     GroupIndexMeta(IndexFileMeta startFileMeta) {
@@ -68,7 +100,7 @@ class GroupIndexMeta {
     void addIndexMeta(IndexFileMeta indexFileMeta) {
         IndexMetaArrayHolder curFileMetas = fileMetaDeque.getLast();
 
-        long curLastLogIndex = curFileMetas.fileMetas.lastLogIndexExclusive();
+        long curLastLogIndex = curFileMetas.lastLogIndexExclusive();
 
         long newFirstLogIndex = indexFileMeta.firstLogIndexInclusive();
 
@@ -117,9 +149,43 @@ class GroupIndexMeta {
         return null;
     }
 
+    /**
+     * Removes all index metas that have log indices smaller than the given 
value.
+     */
+    void truncatePrefix(long firstLogIndexKept) {
+        Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator();
+
+        // Find the most recent entry, which first index is smaller than 
firstLogIndexKept.
+        while (it.hasNext()) {
+            IndexMetaArrayHolder holder = it.next();
+
+            long firstLogIndex = holder.firstLogIndexInclusive();
+
+            if (firstLogIndex == firstLogIndexKept) {
+                // We are right on the edge of meta range, keep this entry and 
simply drop everything older.
+                break;
+            } else if (firstLogIndex < firstLogIndexKept) {
+                // Truncate this entry (possibly in its entirety) and drop 
everything older.
+                if (holder.lastLogIndexExclusive() <= firstLogIndexKept) {
+                    it.remove();
+                } else {
+                    holder.truncateIndicesSmallerThan(firstLogIndexKept);
+                }
+
+                break;
+            }
+        }
+
+        // Remove all remaining entries.
+        while (it.hasNext()) {
+            it.next();
+            it.remove();
+        }
+    }
+
     long firstLogIndexInclusive() {
         for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
-            long firstLogIndex = 
indexMetaArrayHolder.fileMetas.firstLogIndexInclusive();
+            long firstLogIndex = indexMetaArrayHolder.firstLogIndexInclusive();
 
             // "firstLogIndexInclusive" can return -1 of the index file does 
not contain any entries for this group, only the truncation
             // record.
@@ -132,6 +198,6 @@ class GroupIndexMeta {
     }
 
     long lastLogIndexExclusive() {
-        return fileMetaDeque.getLast().fileMetas.lastLogIndexExclusive();
+        return fileMetaDeque.getLast().lastLogIndexExclusive();
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 967dc9702a0..a2f3a74617c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static java.lang.Math.toIntExact;
 import static java.nio.file.StandardOpenOption.CREATE_NEW;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
@@ -64,14 +65,16 @@ import org.jetbrains.annotations.Nullable;
  * +------------------------------------------------------------------+
  * </pre>
  *
- * <p>Raft group meta is as follows:
- * <pre>
- * 
+------------------------------------------------------------------------------------------------------------------------+-----+
- * |                                              Raft group 1 meta            
                                             | ... |
- * 
+------------------------------------------------------------------------------------------------------------------------+-----+
- * | Group ID (8 bytes) | Flags (4 bytes) | Payload Offset (4 bytes) | First 
Log Index (8 bytes) | Last Log Index (8 bytes) | ... |
- * 
+------------------------------------------------------------------------------------------------------------------------+-----+
- * </pre>
+ * <p>Each Raft group meta is as follows (written as a list, because the table 
doesn't fit the configured line length):
+ * <ol>
+ *     <li>Group ID (8 bytes);</li>
+ *     <li>Flags (4 bytes);</li>
+ *     <li>Payload offset (4 bytes);</li>
+ *     <li>First log index (8 bytes, inclusive);</li>
+ *     <li>Last log index (8 bytes, exclusive);</li>
+ *     <li>First log index kept (8 bytes): used during prefix truncation, 
either equal to first index kept if prefix was truncated at least
+ *     once during the index file lifecycle, otherwise equal to {@code 
-1}.</li>
+ * </ol>
  *
  * <p>Payload of the index files has the following structure:
  * <pre>
@@ -101,11 +104,14 @@ class IndexFileManager {
 
     private static final String TMP_FILE_SUFFIX = ".tmp";
 
+    /** Size of the segment file offset entry (used as the payload of an index 
file). */
+    static final int SEGMENT_FILE_OFFSET_SIZE = Integer.BYTES;
+
     // Magic number + format version + number of Raft groups.
     static final int COMMON_META_SIZE = Integer.BYTES + Integer.BYTES + 
Integer.BYTES;
 
-    // Group ID + flags + file offset + start log index + end log index.
-    static final int GROUP_META_SIZE = Long.BYTES + Integer.BYTES + 
Integer.BYTES + Long.BYTES + Long.BYTES;
+    // Group ID + flags + file offset + start log index + end log index + 
first index kept.
+    static final int GROUP_META_SIZE = Long.BYTES + Integer.BYTES + 
Integer.BYTES + Long.BYTES + Long.BYTES + Long.BYTES;
 
     static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
 
@@ -183,7 +189,12 @@ class IndexFileManager {
             Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
 
             while (it.hasNext()) {
-                os.write(payload(it.next().getValue()));
+                SegmentInfo segmentInfo = it.next().getValue();
+
+                // Segment Info may not contain payload in case of suffix 
truncation, see "IndexMemTable#truncateSuffix".
+                if (segmentInfo.size() > 0) {
+                    os.write(payload(segmentInfo));
+                }
             }
         }
 
@@ -293,11 +304,15 @@ class IndexFileManager {
 
             long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
 
+            long firstIndexKept = segmentInfo.firstIndexKept();
+
             // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
             if (!onRecovery) {
-                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+                IndexFileMeta indexFileMeta = createIndexFileMeta(
+                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileOrdinal
+                );
 
-                putIndexFileMeta(groupId, indexFileMeta);
+                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
             }
 
             headerBuffer
@@ -305,7 +320,8 @@ class IndexFileManager {
                     .putInt(0) // Flags.
                     .putInt(payloadOffset)
                     .putLong(firstLogIndexInclusive)
-                    .putLong(lastLogIndexExclusive);
+                    .putLong(lastLogIndexExclusive)
+                    .putLong(firstIndexKept);
 
             payloadOffset += payloadSize(segmentInfo);
         }
@@ -313,13 +329,51 @@ class IndexFileManager {
         return headerBuffer.array();
     }
 
-    private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+    private static @Nullable IndexFileMeta createIndexFileMeta(
+            long firstLogIndexInclusive,
+            long lastLogIndexExclusive,
+            long firstIndexKept,
+            int payloadOffset,
+            int fileOrdinal
+    ) {
+        if (firstLogIndexInclusive == -1) {
+            assert firstIndexKept != -1 : "Expected a prefix tombstone, but 
firstIndexKept is not set.";
+
+            // This is a "prefix tombstone", no need to create any meta, we 
will just truncate the prefix.
+            return null;
+        }
+
+        if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
+            // No prefix truncation required, simply create a new meta.
+            return new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+        }
+
+        // Create a meta with a truncated prefix.
+        int numEntriesToSkip = toIntExact(firstIndexKept - 
firstLogIndexInclusive);
+
+        int adjustedPayloadOffset = payloadOffset + numEntriesToSkip * 
SEGMENT_FILE_OFFSET_SIZE;
+
+        return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive, 
adjustedPayloadOffset, fileOrdinal);
+    }
+
+    private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta 
indexFileMeta, long firstIndexKept) {
         GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
 
         if (existingGroupIndexMeta == null) {
-            groupIndexMetas.put(groupId, new GroupIndexMeta(indexFileMeta));
+            if (indexFileMeta != null) {
+                groupIndexMetas.put(groupId, new 
GroupIndexMeta(indexFileMeta));
+            }
         } else {
-            existingGroupIndexMeta.addIndexMeta(indexFileMeta);
+            if (firstIndexKept != -1) {
+                existingGroupIndexMeta.truncatePrefix(firstIndexKept);
+            }
+
+            if (indexFileMeta != null) {
+                // New index meta must have already been truncated according 
to the prefix tombstone.
+                assert indexFileMeta.firstLogIndexInclusive() >= 
firstIndexKept : indexFileMeta;
+
+                existingGroupIndexMeta.addIndexMeta(indexFileMeta);
+            }
         }
     }
 
@@ -392,10 +446,13 @@ class IndexFileManager {
                 int payloadOffset = groupMetaBuffer.getInt();
                 long firstLogIndexInclusive = groupMetaBuffer.getLong();
                 long lastLogIndexExclusive = groupMetaBuffer.getLong();
+                long firstIndexKept = groupMetaBuffer.getLong();
 
-                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, curFileOrdinal);
+                IndexFileMeta indexFileMeta = createIndexFileMeta(
+                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileOrdinal
+                );
 
-                putIndexFileMeta(groupId, indexFileMeta);
+                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
             }
         }
     }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
index 34bd73e12d2..f75cc00ba1d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
@@ -34,6 +34,9 @@ class IndexFileMeta {
     private final int indexFileOrdinal;
 
     IndexFileMeta(long firstLogIndexInclusive, long lastLogIndexExclusive, int 
indexFilePayloadOffset, int indexFileOrdinal) {
+        assert firstLogIndexInclusive >= 0 : "Invalid first log index: " + 
firstLogIndexInclusive;
+        assert lastLogIndexExclusive >= 0 : "Invalid first log index: " + 
firstLogIndexInclusive;
+
         if (lastLogIndexExclusive < firstLogIndexInclusive) {
             throw new IllegalArgumentException("Invalid log index range: [" + 
firstLogIndexInclusive + ", " + lastLogIndexExclusive + ").");
         }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
index 1256faa4713..8d74c8f267d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static java.lang.Math.toIntExact;
+import static 
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.SEGMENT_FILE_OFFSET_SIZE;
+
 import java.util.Arrays;
 import org.jetbrains.annotations.Nullable;
 
@@ -93,6 +96,12 @@ class IndexFileMetaArray {
      */
     @Nullable
     IndexFileMeta find(long logIndex) {
+        int arrayIndex = findArrayIndex(logIndex);
+
+        return arrayIndex == -1 ? null : array[arrayIndex];
+    }
+
+    private int findArrayIndex(long logIndex) {
         int lowArrayIndex = 0;
         int highArrayIndex = size - 1;
 
@@ -106,10 +115,52 @@ class IndexFileMetaArray {
             } else if (logIndex >= midValue.lastLogIndexExclusive()) {
                 lowArrayIndex = middleArrayIndex + 1;
             } else {
-                return midValue;
+                return middleArrayIndex;
             }
         }
 
-        return null;
+        return -1;
+    }
+
+    IndexFileMetaArray truncateIndicesSmallerThan(long firstLogIndexKept) {
+        int firstLogIndexKeptArrayIndex = findArrayIndex(firstLogIndexKept);
+
+        assert firstLogIndexKeptArrayIndex >= 0 : String.format(
+                "Missing entry for log index %d in range [%d:%d).",
+                firstLogIndexKept, firstLogIndexInclusive(), 
lastLogIndexExclusive()
+        );
+
+        IndexFileMeta metaToUpdate = array[firstLogIndexKeptArrayIndex];
+
+        int numEntriesToSkip = toIntExact(firstLogIndexKept - 
metaToUpdate.firstLogIndexInclusive());
+
+        assert numEntriesToSkip >= 0 : String.format(
+                "Trying to do a no-op prefix truncate from index %d in range 
[%d:%d).",
+                firstLogIndexKept, firstLogIndexInclusive(), 
lastLogIndexExclusive()
+        );
+
+        // Move the payload offset pointer to skip truncated entries (each 
entry is 4 bytes).
+        int adjustedPayloadOffset = metaToUpdate.indexFilePayloadOffset() + 
numEntriesToSkip * SEGMENT_FILE_OFFSET_SIZE;
+
+        var trimmedMeta = new IndexFileMeta(
+                firstLogIndexKept,
+                metaToUpdate.lastLogIndexExclusive(),
+                adjustedPayloadOffset,
+                metaToUpdate.indexFileOrdinal()
+        );
+
+        // Create a new array: the trimmed meta becomes the first element, 
other elements with "firstLogIndexInclusive" larger
+        // than "firstLogIndexKept" are copied from the old array.
+        IndexFileMeta[] newArray = new IndexFileMeta[array.length];
+
+        newArray[0] = trimmedMeta;
+
+        int newSize = size - firstLogIndexKeptArrayIndex;
+
+        if (newSize > 1) {
+            System.arraycopy(array, firstLogIndexKeptArrayIndex + 1, newArray, 
1, newSize - 1);
+        }
+
+        return new IndexFileMetaArray(newArray, newSize);
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
index 2ed981a53d5..e9f0eff6f25 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
@@ -46,7 +46,7 @@ class IndexMemTable implements WriteModeIndexMemTable, 
ReadModeIndexMemTable {
         // File offset can be less than 0 (it's treated as an unsigned 
integer) but never 0, because of the file header.
         assert segmentFileOffset != 0 : String.format("Segment file offset 
must not be 0 [groupId=%d]", groupId);
 
-        ConcurrentMap<Long, SegmentInfo> memTable = stripe(groupId).memTable;
+        ConcurrentMap<Long, SegmentInfo> memTable = memtable(groupId);
 
         SegmentInfo segmentInfo = memTable.get(groupId);
 
@@ -55,6 +55,12 @@ class IndexMemTable implements WriteModeIndexMemTable, 
ReadModeIndexMemTable {
 
             segmentInfo.addOffset(logIndex, segmentFileOffset);
 
+            memTable.put(groupId, segmentInfo);
+        } else if (segmentInfo.isPrefixTombstone()) {
+            segmentInfo = new SegmentInfo(logIndex, 
segmentInfo.firstIndexKept());
+
+            segmentInfo.addOffset(logIndex, segmentFileOffset);
+
             memTable.put(groupId, segmentInfo);
         } else {
             segmentInfo.addOffset(logIndex, segmentFileOffset);
@@ -63,23 +69,40 @@ class IndexMemTable implements WriteModeIndexMemTable, 
ReadModeIndexMemTable {
 
     @Override
     public SegmentInfo segmentInfo(long groupId) {
-        return stripe(groupId).memTable.get(groupId);
+        return memtable(groupId).get(groupId);
     }
 
     @Override
     public void truncateSuffix(long groupId, long lastLogIndexKept) {
-        ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable;
-
-        SegmentInfo segmentInfo = memtable.get(groupId);
+        ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+
+        memtable.compute(groupId, (id, segmentInfo) -> {
+            if (segmentInfo == null || lastLogIndexKept < 
segmentInfo.firstLogIndexInclusive()) {
+                // If the current memtable does not have information for the 
given group or if we are truncating everything currently
+                // present in the memtable, we need to write a special "empty" 
SegmentInfo into the memtable to override existing persisted
+                // data during search.
+                return new SegmentInfo(lastLogIndexKept + 1);
+            } else if (segmentInfo.isPrefixTombstone()) {
+                // This is a prefix tombstone inserted by "truncatePrefix".
+                return new SegmentInfo(lastLogIndexKept + 1, 
segmentInfo.firstIndexKept());
+            } else {
+                return segmentInfo.truncateSuffix(lastLogIndexKept);
+            }
+        });
+    }
 
-        if (segmentInfo == null || lastLogIndexKept < 
segmentInfo.firstLogIndexInclusive()) {
-            // If the current memtable does not have information for the given 
group or if we are truncating everything currently present
-            // in the memtable, we need to write a special "empty" SegmentInfo 
into the memtable to override existing persisted data during
-            // search.
-            memtable.put(groupId, new SegmentInfo(lastLogIndexKept + 1));
-        } else {
-            segmentInfo.truncateSuffix(lastLogIndexKept);
-        }
+    @Override
+    public void truncatePrefix(long groupId, long firstIndexKept) {
+        ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+
+        memtable.compute(groupId, (id, segmentInfo) -> {
+            if (segmentInfo == null) {
+                // The memtable does not have any information for the given 
group, we need to write a special "prefix tombstone".
+                return SegmentInfo.prefixTombstone(firstIndexKept);
+            } else {
+                return segmentInfo.truncatePrefix(firstIndexKept);
+            }
+        });
     }
 
     @Override
@@ -121,6 +144,10 @@ class IndexMemTable implements WriteModeIndexMemTable, 
ReadModeIndexMemTable {
         return stripes[stripeIndex];
     }
 
+    private ConcurrentMap<Long, SegmentInfo> memtable(long groupId) {
+        return stripe(groupId).memTable;
+    }
+
     private class SegmentInfoIterator implements Iterator<Entry<Long, 
SegmentInfo>> {
         private int stripeIndex = 0;
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index e5bb28e077e..e3400628b4a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
 
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
 import static org.apache.ignite.lang.ErrorGroups.Marshalling.COMMON_ERR;
 
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.raft.storage.segstore.CheckpointQueue.Entry;
 import org.apache.ignite.internal.thread.IgniteThread;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Class responsible for running periodic checkpoint tasks.
@@ -104,11 +104,8 @@ class RaftLogCheckpointer {
 
     /**
      * Searches for the segment payload corresponding to the given Raft Group 
ID and Raft Log Index in the checkpoint queue.
-     *
-     * @return {@code ByteBuffer} which position is set to the start of the 
corresponding segment payload or {@code null} if the payload has
-     *         not been found in all files currently present in the queue.
      */
-    @Nullable ByteBuffer findSegmentPayloadInQueue(long groupId, long 
logIndex) {
+    EntrySearchResult findSegmentPayloadInQueue(long groupId, long logIndex) {
         Iterator<Entry> it = queue.tailIterator();
 
         while (it.hasNext()) {
@@ -116,14 +113,29 @@ class RaftLogCheckpointer {
 
             SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId);
 
-            int segmentPayloadOffset = segmentInfo == null ? 0 : 
segmentInfo.getOffset(logIndex);
+            if (segmentInfo == null) {
+                continue;
+            }
+
+            if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
+                return EntrySearchResult.notFound();
+            }
+
+            if (logIndex < segmentInfo.firstIndexKept()) {
+                // This is a prefix tombstone and it cuts off the log index we 
search for.
+                return EntrySearchResult.notFound();
+            }
+
+            int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
+
+            if (segmentPayloadOffset != MISSING_SEGMENT_FILE_OFFSET) {
+                ByteBuffer entryBuffer = 
e.segmentFile().buffer().position(segmentPayloadOffset);
 
-            if (segmentPayloadOffset != 0) {
-                return e.segmentFile().buffer().position(segmentPayloadOffset);
+                return EntrySearchResult.success(entryBuffer);
             }
         }
 
-        return null;
+        return EntrySearchResult.continueSearch();
     }
 
     private class CheckpointTask implements Runnable {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 62222005d00..dedbbb7b37b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
-import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_MARKER;
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_MARKER;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_SIZE;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -38,6 +41,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
 import 
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer;
 import org.apache.ignite.internal.raft.util.VarlenEncoder;
 import org.apache.ignite.internal.util.FastCrc;
@@ -278,43 +282,70 @@ class SegmentFileManager implements ManuallyCloseable {
     }
 
     private @Nullable ByteBuffer getEntry(long groupId, long logIndex) throws 
IOException {
-        // First, read from the current segment file.
+        EntrySearchResult searchResult = getEntryFromCurrentMemtable(groupId, 
logIndex);
+
+        if (searchResult.searchOutcome() == SearchOutcome.CONTINUE_SEARCH) {
+            searchResult = checkpointer.findSegmentPayloadInQueue(groupId, 
logIndex);
+
+            if (searchResult.searchOutcome() == SearchOutcome.CONTINUE_SEARCH) 
{
+                searchResult = readFromOtherSegmentFiles(groupId, logIndex);
+            }
+        }
+
+        switch (searchResult.searchOutcome()) {
+            case SUCCESS: return searchResult.entryBuffer();
+            case NOT_FOUND: return null;
+            default: throw new IllegalStateException("Unexpected search 
outcome: " + searchResult.searchOutcome());
+        }
+    }
+
+    private EntrySearchResult getEntryFromCurrentMemtable(long groupId, long 
logIndex) {
         SegmentFileWithMemtable currentSegmentFile = 
this.currentSegmentFile.get();
 
         SegmentInfo segmentInfo = 
currentSegmentFile.memtable().segmentInfo(groupId);
 
-        if (segmentInfo != null) {
-            if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
-                return null;
-            }
+        if (segmentInfo == null) {
+            return EntrySearchResult.continueSearch();
+        }
 
-            int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
+        if (logIndex >= segmentInfo.lastLogIndexExclusive()) {
+            return EntrySearchResult.notFound();
+        }
 
-            if (segmentPayloadOffset != 0) {
-                return 
currentSegmentFile.segmentFile().buffer().position(segmentPayloadOffset);
-            }
+        if (logIndex < segmentInfo.firstIndexKept()) {
+            // This is a prefix tombstone and it cuts off the log index we 
search for.
+            return EntrySearchResult.notFound();
         }
 
-        ByteBuffer bufferFromCheckpointQueue = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+        int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
 
-        if (bufferFromCheckpointQueue != null) {
-            return bufferFromCheckpointQueue;
+        if (segmentPayloadOffset == MISSING_SEGMENT_FILE_OFFSET) {
+            return EntrySearchResult.continueSearch();
         }
 
-        return readFromOtherSegmentFiles(groupId, logIndex);
+        ByteBuffer entryBuffer = 
currentSegmentFile.segmentFile().buffer().position(segmentPayloadOffset);
+
+        return EntrySearchResult.success(entryBuffer);
     }
 
     void truncateSuffix(long groupId, long lastLogIndexKept) throws 
IOException {
         try (WriteBufferWithMemtable writeBufferWithMemtable = 
reserveBytesWithRollover(TRUNCATE_SUFFIX_RECORD_SIZE)) {
-            ByteBuffer segmentBuffer = writeBufferWithMemtable.buffer();
-
-            SegmentPayload.writeTruncateSuffixRecordTo(segmentBuffer, groupId, 
lastLogIndexKept);
+            
SegmentPayload.writeTruncateSuffixRecordTo(writeBufferWithMemtable.buffer(), 
groupId, lastLogIndexKept);
 
             // Modify the memtable before write buffer is released to avoid 
races with checkpoint on rollover.
             writeBufferWithMemtable.memtable().truncateSuffix(groupId, 
lastLogIndexKept);
         }
     }
 
+    void truncatePrefix(long groupId, long firstLogIndexKept) throws 
IOException {
+        try (WriteBufferWithMemtable writeBufferWithMemtable = 
reserveBytesWithRollover(TRUNCATE_PREFIX_RECORD_SIZE)) {
+            
SegmentPayload.writeTruncatePrefixRecordTo(writeBufferWithMemtable.buffer(), 
groupId, firstLogIndexKept);
+
+            // Modify the memtable before write buffer is released to avoid 
races with checkpoint on rollover.
+            writeBufferWithMemtable.memtable().truncatePrefix(groupId, 
firstLogIndexKept);
+        }
+    }
+
     private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws 
IOException {
         while (true) {
             SegmentFileWithMemtable segmentFileWithMemtable = 
currentSegmentFile();
@@ -337,17 +368,22 @@ class SegmentFileManager implements ManuallyCloseable {
      * storage, not taking pending in-memory state into account.
      */
     long firstLogIndexInclusiveOnRecovery(long groupId) {
+        SegmentFileWithMemtable currentSegmentFile = 
this.currentSegmentFile.get();
+
+        SegmentInfo segmentInfo = 
currentSegmentFile.memtable().segmentInfo(groupId);
+
+        // We need to consult with the latest memtable in case it contains a 
prefix tombstone.
+        if (segmentInfo != null && segmentInfo.firstIndexKept() != -1) {
+            return segmentInfo.firstIndexKept();
+        }
+
         long firstLogIndexFromIndexStorage = 
indexFileManager.firstLogIndexInclusive(groupId);
 
         if (firstLogIndexFromIndexStorage != -1) {
             return firstLogIndexFromIndexStorage;
         }
 
-        SegmentFileWithMemtable currentSegmentFile = 
this.currentSegmentFile.get();
-
-        SegmentInfo segmentInfo = 
currentSegmentFile.memtable().segmentInfo(groupId);
-
-        return segmentInfo != null ? segmentInfo.firstLogIndexInclusive() : -1;
+        return segmentInfo == null ? -1 : segmentInfo.firstLogIndexInclusive();
     }
 
     /**
@@ -462,11 +498,11 @@ class SegmentFileManager implements ManuallyCloseable {
         return fileSize - HEADER_RECORD.length;
     }
 
-    private @Nullable ByteBuffer readFromOtherSegmentFiles(long groupId, long 
logIndex) throws IOException {
+    private EntrySearchResult readFromOtherSegmentFiles(long groupId, long 
logIndex) throws IOException {
         SegmentFilePointer segmentFilePointer = 
indexFileManager.getSegmentFilePointer(groupId, logIndex);
 
         if (segmentFilePointer == null) {
-            return null;
+            return EntrySearchResult.notFound();
         }
 
         Path path = 
segmentFilesDir.resolve(segmentFileName(segmentFilePointer.fileOrdinal(), 0));
@@ -474,41 +510,9 @@ class SegmentFileManager implements ManuallyCloseable {
         // TODO: Add a cache for recently accessed segment files, see 
https://issues.apache.org/jira/browse/IGNITE-26622.
         SegmentFile segmentFile = SegmentFile.openExisting(path);
 
-        return 
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
-    }
-
-    private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, 
Path segmentFilePath) {
-        ByteBuffer buffer = segmentFile.buffer();
-
-        validateSegmentFileHeader(buffer, segmentFilePath);
+        ByteBuffer buffer = 
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
 
-        var memtable = new IndexMemTable(stripes);
-
-        while (!endOfSegmentReached(buffer)) {
-            int segmentFilePayloadOffset = buffer.position();
-
-            long groupId = buffer.getLong();
-
-            int payloadLength = buffer.getInt();
-
-            if (payloadLength == TRUNCATE_SUFFIX_RECORD_MARKER) {
-                long lastLogIndexKept = buffer.getLong();
-
-                memtable.truncateSuffix(groupId, lastLogIndexKept);
-
-                buffer.position(buffer.position() + HASH_SIZE_BYTES);
-            } else {
-                int endOfRecordPosition = buffer.position() + payloadLength + 
HASH_SIZE_BYTES;
-
-                long index = VarlenEncoder.readLong(buffer);
-
-                memtable.appendSegmentFileOffset(groupId, index, 
segmentFilePayloadOffset);
-
-                buffer.position(endOfRecordPosition);
-            }
-        }
-
-        return memtable;
+        return EntrySearchResult.success(buffer);
     }
 
     private static boolean endOfSegmentReached(ByteBuffer buffer) {
@@ -529,17 +533,34 @@ class SegmentFileManager implements ManuallyCloseable {
 
     /**
      * Creates an index memtable from the given segment file. Unlike {@link 
#recoverMemtable} which is expected to only be called on
-     * "complete" segment files (i.e. those that has experienced a rollover), 
this method is expected to be called on the most recent,
+     * "complete" segment files (i.e. those that have experienced a rollover) 
this method is expected to be called on the most recent,
      * possibly incomplete segment file.
      */
     private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile 
segmentFile, Path segmentFilePath) {
+        return recoverMemtable(segmentFile, segmentFilePath, true);
+    }
+
+    /**
+     * Creates an index memtable from the given segment file. This method is 
expected to be called only on "complete" segment files
+     * (i.e. those that have experienced a rollover).
+     *
+     * <p>This method skips CRC validation, because it is used to identify the 
end of incomplete segment files (and, by definition, this can
+     * never happen during this method's invocation), not to validate storage 
integrity.
+     */
+    private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, 
Path segmentFilePath) {
+        // We skip CRC validation during recovery of already "rollovered" 
segment files, because CRC validation is only used to find an end
+        // of an incomplete segment file, not to check for storage integrity.
+        return recoverMemtable(segmentFile, segmentFilePath, false);
+    }
+
+    private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, 
Path segmentFilePath, boolean validateCrc) {
         ByteBuffer buffer = segmentFile.buffer();
 
         validateSegmentFileHeader(buffer, segmentFilePath);
 
         var memtable = new IndexMemTable(stripes);
 
-        while (buffer.remaining() > SWITCH_SEGMENT_RECORD.length) {
+        while (!endOfSegmentReached(buffer)) {
             int segmentFilePayloadOffset = buffer.position();
 
             long groupId = buffer.getLong();
@@ -556,11 +577,24 @@ class SegmentFileManager implements ManuallyCloseable {
                 buffer.position(segmentFilePayloadOffset);
 
                 // CRC violation signals the end of meaningful data in the 
segment file.
-                if (!isCrcValid(buffer, crcPosition)) {
+                if (validateCrc && !isCrcValid(buffer, crcPosition)) {
                     break;
                 }
 
                 memtable.truncateSuffix(groupId, lastLogIndexKept);
+            } else if (payloadLength == TRUNCATE_PREFIX_RECORD_MARKER) {
+                long firstLogIndexKept = buffer.getLong();
+
+                crcPosition = buffer.position();
+
+                buffer.position(segmentFilePayloadOffset);
+
+                // CRC violation signals the end of meaningful data in the 
segment file.
+                if (validateCrc && !isCrcValid(buffer, crcPosition)) {
+                    break;
+                }
+
+                memtable.truncatePrefix(groupId, firstLogIndexKept);
             } else {
                 crcPosition = buffer.position() + payloadLength;
 
@@ -569,14 +603,14 @@ class SegmentFileManager implements ManuallyCloseable {
                 buffer.position(segmentFilePayloadOffset);
 
                 // CRC violation signals the end of meaningful data in the 
segment file.
-                if (!isCrcValid(buffer, crcPosition)) {
+                if (validateCrc && !isCrcValid(buffer, crcPosition)) {
                     break;
                 }
 
                 memtable.appendSegmentFileOffset(groupId, index, 
segmentFilePayloadOffset);
             }
 
-            buffer.position(crcPosition + HASH_SIZE_BYTES);
+            buffer.position(crcPosition + CRC_SIZE_BYTES);
         }
 
         return memtable;
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
index ffffb17f72e..34ea4c3e567 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static java.lang.Math.toIntExact;
+
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.nio.ByteBuffer;
@@ -24,8 +26,29 @@ import java.util.Arrays;
 
 /**
  * Information about a segment file for single Raft Group stored in a {@link 
IndexMemTable}.
+ *
+ * <p>It consists of a base log index and an array of segment file offsets 
which stores in log entry offsets which indices lie in the
+ * {@code [logIndexBase, logIndexBase + segmentFileOffsets.size)} range.
+ *
+ * <p>Objects of this class can represent special conditions in presence of 
log truncations:
+ *
+ * <ul>
+ *     <li>When a suffix truncation happens, a "suffix tombstone" is inserted 
with {@code logIndexBase} equal to the next log index
+ *     after the cutoff value and an empty offsets array. This array can then 
be populated with new entries, as usual. This means that
+ *     {@link #firstLogIndexInclusive()} can be equal to {@link 
#lastLogIndexExclusive()} if this is a "pure" suffix tombstone
+ *     (i.e. tombstone without any values added after it was created);</li>
+ *     <li>When a prefix truncation happens, a "prefix tombstone" is inserted. 
Depending on the state of the object, it can either be a
+ *     "pure" prefix tombstone ({@code logIndexBase = -1}, {@code 
firstIndexKept = <cutoff value>}, empty offsets array), this means that
+ *     a memtable didn't contain any data for a particular Raft group at the 
moment of truncation, or a "regular" prefix tombstone
+ *     ({@code logIndexBase > 0}, {@code firstIndexKept=<cutoff value>}, 
non-empty offsets array), which means that some entries were
+ *     inserted (or already existed) after the cutoff index. This means that 
if {@link #firstIndexKept()} is not equal to {@code -1}
+ *     (i.e. this is a prefix tombstone), an additional check is required 
({@link #firstLogIndexInclusive()} == -1) to identify if this is a
+ *     "pure" prefix tombstone.</li>
+ * </ul>
  */
 class SegmentInfo {
+    static int MISSING_SEGMENT_FILE_OFFSET = 0;
+
     private static class ArrayWithSize {
         private static final int INITIAL_CAPACITY = 10;
 
@@ -56,13 +79,25 @@ class SegmentInfo {
             return new ArrayWithSize(array, size + 1);
         }
 
-        ArrayWithSize truncate(int newSize) {
+        ArrayWithSize truncateSuffix(int newSize) {
+            return truncate(0, newSize);
+        }
+
+        ArrayWithSize truncatePrefix(int newSize) {
+            int srcPos = size - newSize;
+
+            return truncate(srcPos, newSize);
+        }
+
+        private ArrayWithSize truncate(int srcPos, int newSize) {
             assert newSize <= size
                     : String.format("Array must shrink on truncation, current 
size: %d, size after truncation: %d", size, newSize);
 
-            int[] newArray = new int[size];
+            // We use the original array's size, not "newSize" here, because 
new entries are expected to be added at the end anyway,
+            // so we can use the larger size to avoid unnecessary array copies.
+            int[] newArray = new int[array.length];
 
-            System.arraycopy(array, 0, newArray, 0, newSize);
+            System.arraycopy(array, srcPos, newArray, 0, newSize);
 
             return new ArrayWithSize(newArray, newSize);
         }
@@ -92,20 +127,42 @@ class SegmentInfo {
      */
     private final long logIndexBase;
 
+    /**
+     * Special log index value used to indicate that a prefix truncation 
command has been executed. Is equal to the cutoff log index value
+     * or {@code -1}, if no prefix truncations happened.
+     */
+    private final long firstIndexKept;
+
     /**
      * Offsets in a segment file.
      */
     @SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
-    private volatile ArrayWithSize segmentFileOffsets = new ArrayWithSize();
+    private volatile ArrayWithSize segmentFileOffsets;
 
     SegmentInfo(long logIndexBase) {
+        this(logIndexBase, -1, new ArrayWithSize());
+    }
+
+    SegmentInfo(long logIndexBase, long firstIndexKept) {
+        this(logIndexBase, firstIndexKept, new ArrayWithSize());
+    }
+
+    static SegmentInfo prefixTombstone(long firstIndexKept) {
+        return new SegmentInfo(-1, firstIndexKept);
+    }
+
+    private SegmentInfo(long logIndexBase, long firstIndexKept, ArrayWithSize 
segmentFileOffsets) {
         this.logIndexBase = logIndexBase;
+        this.firstIndexKept = firstIndexKept;
+        this.segmentFileOffsets = segmentFileOffsets;
     }
 
     /**
      * Puts the given segment file offset under the given log index.
      */
     void addOffset(long logIndex, int segmentFileOffset) {
+        assert segmentFileOffset != MISSING_SEGMENT_FILE_OFFSET : "Segment 
file offset cannot be 0";
+
         ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
 
         // Check that log indexes are monotonically increasing.
@@ -113,29 +170,23 @@ class SegmentInfo {
                 String.format("Log indexes are not monotonically increasing 
[logIndex=%d, expectedLogIndex=%d].",
                         logIndex, logIndexBase + segmentFileOffsets.size());
 
-        ArrayWithSize newSegmentFileOffsets = 
segmentFileOffsets.add(segmentFileOffset);
-
-        // Simple assignment would suffice, since we only have one thread 
writing to this field, but we use compareAndSet to verify
-        // this invariant, just in case.
-        boolean updated = SEGMENT_FILE_OFFSETS_VH.compareAndSet(this, 
segmentFileOffsets, newSegmentFileOffsets);
-
-        assert updated : "Concurrent writes detected";
+        setSegmentFileOffsets(segmentFileOffsets, 
segmentFileOffsets.add(segmentFileOffset));
     }
 
     /**
-     * Returns the segment file offset for the given log index or {@code 0} if 
the log index was not found.
+     * Returns the segment file offset for the given log index or {@link 
#MISSING_SEGMENT_FILE_OFFSET} if the log index was not found.
      */
     int getOffset(long logIndex) {
         long offsetIndex = logIndex - logIndexBase;
 
         if (offsetIndex < 0) {
-            return 0;
+            return MISSING_SEGMENT_FILE_OFFSET;
         }
 
         ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
 
         if (offsetIndex >= segmentFileOffsets.size()) {
-            return 0;
+            return MISSING_SEGMENT_FILE_OFFSET;
         }
 
         return segmentFileOffsets.get((int) offsetIndex);
@@ -155,6 +206,17 @@ class SegmentInfo {
         return logIndexBase + segmentFileOffsets.size();
     }
 
+    /**
+     * Returns the log index used during prefix truncation or {@code -1} if no 
prefix truncation was issued.
+     */
+    long firstIndexKept() {
+        return firstIndexKept;
+    }
+
+    boolean isPrefixTombstone() {
+        return logIndexBase == -1;
+    }
+
     /**
      * Returns the number of offsets stored in this memtable.
      */
@@ -168,29 +230,76 @@ class SegmentInfo {
     void saveOffsetsTo(ByteBuffer buffer) {
         ArrayWithSize offsets = segmentFileOffsets;
 
+        assert offsets.size() > 0 : "Offsets array must not be empty";
+
         buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
     }
 
     /**
      * Removes all data which log indices are strictly greater than {@code 
lastLogIndexKept}.
      */
-    void truncateSuffix(long lastLogIndexKept) {
+    SegmentInfo truncateSuffix(long lastLogIndexKept) {
         assert lastLogIndexKept >= logIndexBase : 
String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, 
lastLogIndexKept);
 
         ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
 
-        long newSize = lastLogIndexKept - logIndexBase + 1;
+        long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
 
-        // Not using an assertion here, because this value comes doesn't come 
from the storage code.
-        if (newSize > segmentFileOffsets.size()) {
+        // Not using an assertion here, because this value doesn't come from 
the storage code.
+        if (lastLogIndexKept >= lastLogIndexExclusive) {
             throw new IllegalArgumentException(String.format(
                     "lastLogIndexKept is too large. Last index in memtable: 
%d, lastLogIndexKept: %d",
-                    logIndexBase + segmentFileOffsets.size() - 1, 
lastLogIndexKept
+                    lastLogIndexExclusive - 1, lastLogIndexKept
             ));
         }
 
-        ArrayWithSize newSegmentFileOffsets = 
segmentFileOffsets.truncate((int) newSize);
+        int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1);
+
+        setSegmentFileOffsets(segmentFileOffsets, 
segmentFileOffsets.truncateSuffix(newSize));
+
+        // This could have been a "void" method, but this way it looks 
consistent with "truncatePrefix". Since the tail of the
+        // {@code segmentFileOffsets} array is mutable, we can avoid creating 
a new object.
+        return this;
+    }
+
+    /**
+     * Removes all data which log indices are strictly smaller than {@code 
firstIndexKept}.
+     */
+    SegmentInfo truncatePrefix(long firstIndexKept) {
+        if (isPrefixTombstone()) {
+            if (firstIndexKept <= this.firstIndexKept) {
+                throw new IllegalStateException(String.format(
+                        "Trying to truncate an already truncated prefix 
[curFirstIndexKept=%d, newFirstIndexKept=%d]",
+                        this.firstIndexKept, firstIndexKept
+                ));
+            }
+
+            return prefixTombstone(firstIndexKept);
+        }
+
+        ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
+
+        if (firstIndexKept < logIndexBase) {
+            // Add the prefix tombstone property to the current SegmentInfo.
+            return new SegmentInfo(logIndexBase, firstIndexKept, 
segmentFileOffsets);
+        }
+
+        long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
+
+        // Not using an assertion here, because this value doesn't come from 
the storage code.
+        if (firstIndexKept >= lastLogIndexExclusive) {
+            throw new IllegalArgumentException(String.format(
+                    "firstIndexKept is too large. Last index in memtable: %d, 
firstIndexKept: %d",
+                    lastLogIndexExclusive - 1, firstIndexKept
+            ));
+        }
+
+        int newSize = toIntExact(lastLogIndexExclusive - firstIndexKept);
+
+        return new SegmentInfo(firstIndexKept, firstIndexKept, 
segmentFileOffsets.truncatePrefix(newSize));
+    }
 
+    private void setSegmentFileOffsets(ArrayWithSize segmentFileOffsets, 
ArrayWithSize newSegmentFileOffsets) {
         // Simple assignment would suffice, since we only have one thread 
writing to this field, but we use compareAndSet to verify
         // this invariant, just in case.
         boolean updated = SEGMENT_FILE_OFFSETS_VH.compareAndSet(this, 
segmentFileOffsets, newSegmentFileOffsets);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index 94fd4de599c..6428cfda804 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -35,17 +35,26 @@ class SegmentPayload {
 
     static final int LENGTH_SIZE_BYTES = Integer.BYTES;
 
-    static final int HASH_SIZE_BYTES = Integer.BYTES;
+    static final int CRC_SIZE_BYTES = Integer.BYTES;
 
     /**
      * Length of the byte sequence that is written when suffix truncation 
happens.
      *
-     * <p>Format: {@code groupId, 0 (special length value), last kept index, 
crc}
+     * <p>Format: {@code groupId, TRUNCATE_SUFFIX_RECORD_MARKER (special 
length value), last kept index, crc}
      */
-    static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES + 
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE_BYTES;
+    static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES + 
LENGTH_SIZE_BYTES + Long.BYTES + CRC_SIZE_BYTES;
+
+    /**
+     * Length of the byte sequence that is written when prefix truncation 
happens.
+     *
+     * <p>Format: {@code groupId, TRUNCATE_PREFIX_RECORD_MARKER (special 
length value), first kept index, crc}
+     */
+    static final int TRUNCATE_PREFIX_RECORD_SIZE = TRUNCATE_SUFFIX_RECORD_SIZE;
 
     static final int TRUNCATE_SUFFIX_RECORD_MARKER = 0;
 
+    static final int TRUNCATE_PREFIX_RECORD_MARKER = -1;
+
     static void writeTo(
             ByteBuffer buffer,
             long groupId,
@@ -66,28 +75,33 @@ class SegmentPayload {
 
         logEntryEncoder.encode(buffer, logEntry);
 
-        int dataSize = buffer.position() - originalPos;
+        int recordSize = buffer.position() - originalPos;
 
-        // Rewind the position for CRC calculation.
-        buffer.position(originalPos);
-
-        int crc = FastCrc.calcCrc(buffer, dataSize);
-
-        // After CRC calculation the position will be at the provided end of 
the buffer.
-        buffer.putInt(crc);
+        writeCrc(buffer, recordSize);
     }
 
     static void writeTruncateSuffixRecordTo(ByteBuffer buffer, long groupId, 
long lastLogIndexKept) {
-        int originalPos = buffer.position();
-
         buffer
                 .putLong(groupId)
                 .putInt(TRUNCATE_SUFFIX_RECORD_MARKER)
                 .putLong(lastLogIndexKept);
 
-        buffer.position(originalPos);
+        writeCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE - CRC_SIZE_BYTES);
+    }
+
+    static void writeTruncatePrefixRecordTo(ByteBuffer buffer, long groupId, 
long firstIndexKept) {
+        buffer
+                .putLong(groupId)
+                .putInt(TRUNCATE_PREFIX_RECORD_MARKER)
+                .putLong(firstIndexKept);
+
+        writeCrc(buffer, TRUNCATE_PREFIX_RECORD_SIZE - CRC_SIZE_BYTES);
+    }
+
+    private static void writeCrc(ByteBuffer buffer, int recordSizeWithoutCrc) {
+        buffer.position(buffer.position() - recordSizeWithoutCrc);
 
-        int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE - 
HASH_SIZE_BYTES);
+        int crc = FastCrc.calcCrc(buffer, recordSizeWithoutCrc);
 
         buffer.putInt(crc);
     }
@@ -126,7 +140,7 @@ class SegmentPayload {
         buffer.get(entryBytes);
 
         // Move the position as if we have read the whole payload.
-        buffer.position(buffer.position() + HASH_SIZE_BYTES);
+        buffer.position(buffer.position() + CRC_SIZE_BYTES);
 
         return logEntryDecoder.decode(entryBytes);
     }
@@ -140,6 +154,6 @@ class SegmentPayload {
     }
 
     static int fixedOverheadSize() {
-        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES;
+        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + CRC_SIZE_BYTES;
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
index 99191668774..186d3031e8c 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorage.java
@@ -142,7 +142,15 @@ class SegstoreLogStorage implements LogStorage {
 
     @Override
     public boolean truncatePrefix(long firstIndexKept) {
-        throw new UnsupportedOperationException();
+        try {
+            segmentFileManager.truncatePrefix(groupId, firstIndexKept);
+        } catch (IOException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        }
+
+        firstLogIndexInclusive = firstIndexKept;
+
+        return true;
     }
 
     @Override
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
index df8bd40e1d4..96a8babfad5 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/WriteModeIndexMemTable.java
@@ -48,6 +48,11 @@ interface WriteModeIndexMemTable {
      */
     void truncateSuffix(long groupId, long lastLogIndexKept);
 
+    /**
+     * Removes all offsets for the given Raft group which log indices are 
strictly smaller than {@code firstIndexKept}.
+     */
+    void truncatePrefix(long groupId, long firstIndexKept);
+
     /**
      * Returns the read-only version of this memtable.
      */
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
index 53a5dc11396..66519753345 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/CheckpointQueueTest.java
@@ -179,7 +179,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
         int numEntries = 10_000;
 
         RunnableX producerTask = () -> {
-            for (int i = 0; i < numEntries; i++) {
+            for (int i = 1; i <= numEntries; i++) {
                 ReadModeIndexMemTable mockTable = 
mock(ReadModeIndexMemTable.class);
 
                 var segmentInfo = new SegmentInfo(0);
@@ -193,7 +193,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
         };
 
         RunnableX consumerTask = () -> {
-            for (int i = 0; i < numEntries; i++) {
+            for (int i = 1; i <= numEntries; i++) {
                 Entry entry = queue.peekHead();
 
                 assertThat(entry.memTable().segmentInfo(0).getOffset(0), 
is(i));
@@ -212,7 +212,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
         int numEntries = 10_000;
 
         RunnableX producerTask = () -> {
-            for (int i = 0; i < numEntries; i++) {
+            for (int i = 1; i <= numEntries; i++) {
                 ReadModeIndexMemTable mockTable = 
mock(ReadModeIndexMemTable.class);
 
                 var segmentInfo = new SegmentInfo(0);
@@ -226,7 +226,7 @@ class CheckpointQueueTest extends BaseIgniteAbstractTest {
         };
 
         RunnableX consumerTask = () -> {
-            for (int i = 0; i < numEntries; i++) {
+            for (int i = 1; i <= numEntries; i++) {
                 Entry entry = queue.peekHead();
 
                 assertThat(entry.memTable().segmentInfo(0).getOffset(0), 
is(i));
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
index 4f7eb6d0faf..42cd54a6396 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.GROUP_ID_SIZE_BYTES;
-import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE_BYTES;
 import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.LENGTH_SIZE_BYTES;
 import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,9 +63,9 @@ class DeserializedSegmentPayload {
 
         int payloadLength = readFully(channel, LENGTH_SIZE_BYTES).getInt();
 
-        ByteBuffer remaining = readFully(channel, payloadLength + 
HASH_SIZE_BYTES);
+        ByteBuffer remaining = readFully(channel, payloadLength + 
CRC_SIZE_BYTES);
 
-        ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength + 
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE_BYTES)
+        ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength + 
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + CRC_SIZE_BYTES)
                 .order(SegmentFile.BYTE_ORDER)
                 .putLong(groupId)
                 .putInt(payloadLength)
@@ -113,7 +113,7 @@ class DeserializedSegmentPayload {
     }
 
     int size() {
-        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length + 
HASH_SIZE_BYTES;
+        return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length + 
CRC_SIZE_BYTES;
     }
 
     private static ByteBuffer readFully(ReadableByteChannel byteChannel, int 
len) throws IOException {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
index ceca38d7b56..da01d0eb8c7 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -195,22 +195,83 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
                             expectedFileOrdinal
                     );
 
-                    var expectedMetaWithoutOverlap = new IndexFileMeta(
-                            expectedFirstLogIndex + overlap - 
logEntriesPerFile,
-                            expectedFirstLogIndex + overlap - 1,
-                            0,
-                            expectedFileOrdinal - 1
-                    );
-
-                    // We can possibly be reading from two different metas - 
from the newer one (that overlaps the older one) or
-                    // the older one.
-                    assertThat(
-                            logIndex + " -> " + indexFileMeta,
-                            indexFileMeta, 
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap)));
+                    if (expectedFirstLogIndex == 0) {
+                        assertThat(logIndex + " -> " + indexFileMeta, 
indexFileMeta, is(expectedMetaWithOverlap));
+                    } else {
+                        var expectedMetaWithoutOverlap = new IndexFileMeta(
+                                expectedFirstLogIndex + overlap - 
logEntriesPerFile,
+                                expectedFirstLogIndex + overlap - 1,
+                                0,
+                                expectedFileOrdinal - 1
+                        );
+
+                        // We can possibly be reading from two different metas 
- from the newer one (that overlaps the older one) or
+                        // the older one.
+                        assertThat(
+                                logIndex + " -> " + indexFileMeta,
+                                indexFileMeta, 
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap))
+                        );
+                    }
                 }
             }
         };
 
         runRace(writer, reader, reader, reader);
     }
+
+    @Test
+    void testTruncatePrefix() {
+        var meta1 = new IndexFileMeta(1, 100, 0, 0);
+        var meta2 = new IndexFileMeta(42, 100, 42, 1);
+        var meta3 = new IndexFileMeta(100, 120, 66, 2);
+        var meta4 = new IndexFileMeta(110, 200, 95, 3);
+
+        var groupMeta = new GroupIndexMeta(meta1);
+
+        groupMeta.addIndexMeta(meta2);
+        groupMeta.addIndexMeta(meta3);
+        groupMeta.addIndexMeta(meta4);
+
+        assertThat(groupMeta.firstLogIndexInclusive(), is(1L));
+        assertThat(groupMeta.lastLogIndexExclusive(), is(200L));
+
+        assertThat(groupMeta.indexMeta(10), is(meta1));
+        assertThat(groupMeta.indexMeta(42), is(meta2));
+        assertThat(groupMeta.indexMeta(100), is(meta3));
+        assertThat(groupMeta.indexMeta(110), is(meta4));
+
+        groupMeta.truncatePrefix(43);
+
+        assertThat(groupMeta.indexMeta(10), is(nullValue()));
+        assertThat(groupMeta.indexMeta(42), is(nullValue()));
+
+        // Payload offset is shifted 4 bytes in order to skip the truncated 
entry.
+        var trimmedMeta = new IndexFileMeta(43, 100, 46, 1);
+
+        assertThat(groupMeta.indexMeta(43), is(trimmedMeta));
+        assertThat(groupMeta.indexMeta(100), is(meta3));
+        assertThat(groupMeta.indexMeta(110), is(meta4));
+
+        groupMeta.truncatePrefix(110);
+
+        assertThat(groupMeta.indexMeta(43), is(nullValue()));
+        assertThat(groupMeta.indexMeta(100), is(nullValue()));
+        assertThat(groupMeta.indexMeta(110), is(meta4));
+    }
+
+    @Test
+    void testTruncatePrefixRemovesAllEntriesWhenKeptAfterLast() {
+        var meta1 = new IndexFileMeta(1, 10, 0, 0);
+        var meta2 = new IndexFileMeta(10, 20, 100, 1);
+
+        var groupMeta = new GroupIndexMeta(meta1);
+        groupMeta.addIndexMeta(meta2);
+
+        // Truncate to the end of last meta - everything should be removed.
+        groupMeta.truncatePrefix(20);
+
+        assertThat(groupMeta.indexMeta(0), is(nullValue()));
+        assertThat(groupMeta.indexMeta(19), is(nullValue()));
+        assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 62a08638b05..cb76a4adc53 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -215,7 +215,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
     }
 
     @Test
-    void testFirstLastLogIndicesWithTruncate() throws IOException {
+    void testFirstLastLogIndicesWithTruncateSuffix() throws IOException {
         var memtable = new IndexMemTable(STRIPES);
 
         memtable.appendSegmentFileOffset(0, 1, 1);
@@ -237,6 +237,29 @@ class IndexFileManagerTest extends IgniteAbstractTest {
         assertThat(indexFileManager.lastLogIndexExclusive(0), is(2L));
     }
 
+    @Test
+    void testFirstLastLogIndicesWithTruncatePrefix() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+        memtable.appendSegmentFileOffset(0, 2, 1);
+        memtable.appendSegmentFileOffset(0, 3, 1);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
+        assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.truncatePrefix(0, 2);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.firstLogIndexInclusive(0), is(2L));
+        assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
+    }
+
     @Test
     void testGetSegmentPointerWithTruncate() throws IOException {
         var memtable = new IndexMemTable(STRIPES);
@@ -329,6 +352,31 @@ class IndexFileManagerTest extends IgniteAbstractTest {
         assertThat(indexFileManager.getSegmentFilePointer(0, 3), 
is(nullValue()));
     }
 
+    @Test
+    void testRecoveryWithTruncatePrefix() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+        memtable.appendSegmentFileOffset(0, 2, 2);
+        memtable.appendSegmentFileOffset(0, 3, 3);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.truncatePrefix(0, 2);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        indexFileManager = new IndexFileManager(workDir);
+
+        indexFileManager.start();
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(0, 2)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new 
SegmentFilePointer(0, 3)));
+    }
+
     @Test
     void testExists() throws IOException {
         assertThat(indexFileManager.indexFileExists(0), is(false));
@@ -371,4 +419,70 @@ class IndexFileManagerTest extends IgniteAbstractTest {
         assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new 
SegmentFilePointer(5, 1)));
         assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(6, 2)));
     }
+
+    @Test
+    void testTruncatePrefix() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 2, 1);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 3, 1);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.truncatePrefix(0, 2);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(1, 1)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new 
SegmentFilePointer(2, 1)));
+    }
+
+    @Test
+    void testCombinationOfPrefixAndSuffixTombstones() throws IOException {
+        var memtable = new IndexMemTable(STRIPES);
+
+        memtable.appendSegmentFileOffset(0, 1, 1);
+        memtable.appendSegmentFileOffset(0, 2, 2);
+        memtable.appendSegmentFileOffset(0, 3, 3);
+        memtable.appendSegmentFileOffset(0, 4, 4);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        memtable = new IndexMemTable(STRIPES);
+
+        memtable.truncatePrefix(0, 2);
+
+        memtable.truncateSuffix(0, 3);
+
+        indexFileManager.saveIndexMemtable(memtable);
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(0, 2)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new 
SegmentFilePointer(0, 3)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 4), 
is(nullValue()));
+
+        // Restart the manager to check recovery.
+        indexFileManager = new IndexFileManager(workDir);
+
+        indexFileManager.start();
+
+        assertThat(indexFileManager.getSegmentFilePointer(0, 1), 
is(nullValue()));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new 
SegmentFilePointer(0, 2)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 3), is(new 
SegmentFilePointer(0, 3)));
+        assertThat(indexFileManager.getSegmentFilePointer(0, 4), 
is(nullValue()));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
index 55d24237ebb..23a4e61f6d0 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
@@ -34,6 +34,8 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
 
         assertThat(array.size(), is(1));
         assertThat(array.get(0), is(initialMeta));
+        assertThat(array.firstLogIndexInclusive(), is(1L));
+        assertThat(array.lastLogIndexExclusive(), is(2L));
 
         var meta2 = new IndexFileMeta(2, 3, 0, 1);
 
@@ -41,6 +43,8 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
 
         assertThat(array.size(), is(2));
         assertThat(array.get(1), is(meta2));
+        assertThat(array.firstLogIndexInclusive(), is(1L));
+        assertThat(array.lastLogIndexExclusive(), is(3L));
 
         for (int i = 0; i < INITIAL_CAPACITY; i++) {
             long logIndex = meta2.firstLogIndexInclusive() + i + 1;
@@ -54,6 +58,8 @@ class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
 
         assertThat(array.size(), is(3 + INITIAL_CAPACITY));
         assertThat(array.get(array.size() - 1), is(meta3));
+        assertThat(array.firstLogIndexInclusive(), is(1L));
+        assertThat(array.lastLogIndexExclusive(), is(INITIAL_CAPACITY + 4L));
     }
 
     @Test
@@ -105,4 +111,37 @@ class IndexFileMetaArrayTest extends 
BaseIgniteAbstractTest {
         assertThat(array.find(9), is(meta1));
         assertThat(array.find(10), is(meta3));
     }
+
+    @Test
+    void testTruncateInsideMetaAdjustsPayloadOffset() {
+        var meta1 = new IndexFileMeta(1, 10, 100, 0);
+        var meta2 = new IndexFileMeta(10, 20, 200, 1);
+
+        IndexFileMetaArray array = new IndexFileMetaArray(meta1).add(meta2);
+
+        IndexFileMetaArray truncated = array.truncateIndicesSmallerThan(5);
+
+        assertThat(truncated.size(), is(2));
+
+        IndexFileMeta trimmedMeta = truncated.get(0);
+
+        assertThat(trimmedMeta.firstLogIndexInclusive(), is(5L));
+        assertThat(trimmedMeta.lastLogIndexExclusive(), is(10L));
+        assertThat(trimmedMeta.indexFilePayloadOffset(), is(100 + 4 * 
Integer.BYTES));
+
+        assertThat(truncated.get(1), is(meta2));
+    }
+
+    @Test
+    void testTruncateToMetaBoundarySkipsPreviousMeta() {
+        var meta1 = new IndexFileMeta(1, 10, 100, 0);
+        var meta2 = new IndexFileMeta(10, 20, 200, 1);
+
+        IndexFileMetaArray array = new IndexFileMetaArray(meta1).add(meta2);
+
+        IndexFileMetaArray truncated = array.truncateIndicesSmallerThan(10);
+
+        assertThat(truncated.size(), is(1));
+        assertThat(truncated.get(0), is(meta2));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
index afc2263f9cf..a7c4e14749c 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.raft.storage.segstore;
 
+import static 
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,9 +59,9 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
     void testMissingValue() {
         memTable.appendSegmentFileOffset(0, 5, 1);
 
-        assertThat(memTable.segmentInfo(0).getOffset(1), is(0));
+        assertThat(memTable.segmentInfo(0).getOffset(1), 
is(MISSING_SEGMENT_FILE_OFFSET));
         assertThat(memTable.segmentInfo(0).getOffset(5), is(1));
-        assertThat(memTable.segmentInfo(0).getOffset(6), is(0));
+        assertThat(memTable.segmentInfo(0).getOffset(6), 
is(MISSING_SEGMENT_FILE_OFFSET));
         assertThat(memTable.segmentInfo(1), is(nullValue()));
     }
 
@@ -81,11 +83,11 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
             if (groupId == 0) {
                 assertThat(segmentInfo.getOffset(0), is(1));
                 assertThat(segmentInfo.getOffset(1), is(2));
-                assertThat(segmentInfo.getOffset(2), is(0));
+                assertThat(segmentInfo.getOffset(2), 
is(MISSING_SEGMENT_FILE_OFFSET));
             } else {
                 assertThat(segmentInfo.getOffset(0), is(3));
                 assertThat(segmentInfo.getOffset(1), is(4));
-                assertThat(segmentInfo.getOffset(2), is(0));
+                assertThat(segmentInfo.getOffset(2), 
is(MISSING_SEGMENT_FILE_OFFSET));
             }
         });
     }
@@ -106,7 +108,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
                 SegmentInfo segmentInfo = memTable.segmentInfo(0);
 
                 if (segmentInfo != null) {
-                    assertThat(segmentInfo.getOffset(i), either(is(i + 
1)).or(is(0)));
+                    assertThat(segmentInfo.getOffset(i), either(is(i + 
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
                 }
             }
         };
@@ -138,7 +140,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
                     SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
 
                     if (segmentInfo != null) {
-                        assertThat(segmentInfo.getOffset(j), either(is(j + 
1)).or(is(0)));
+                        assertThat(segmentInfo.getOffset(j), either(is(j + 
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
                     }
                 }
             });
@@ -175,8 +177,8 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
 
         assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
         assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
-        assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
-        assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+        assertThat(memTable.segmentInfo(groupId0).getOffset(3), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId0).getOffset(4), 
is(MISSING_SEGMENT_FILE_OFFSET));
 
         assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(55));
         assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(56));
@@ -187,8 +189,8 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
 
         assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
         assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
-        assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
-        assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+        assertThat(memTable.segmentInfo(groupId0).getOffset(3), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId0).getOffset(4), 
is(MISSING_SEGMENT_FILE_OFFSET));
 
         assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(55));
         assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(56));
@@ -199,13 +201,13 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
 
         assertThat(memTable.segmentInfo(groupId0).getOffset(1), is(42));
         assertThat(memTable.segmentInfo(groupId0).getOffset(2), is(43));
-        assertThat(memTable.segmentInfo(groupId0).getOffset(3), is(0));
-        assertThat(memTable.segmentInfo(groupId0).getOffset(4), is(0));
+        assertThat(memTable.segmentInfo(groupId0).getOffset(3), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId0).getOffset(4), 
is(MISSING_SEGMENT_FILE_OFFSET));
 
-        assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(0));
-        assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(0));
-        assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(0));
-        assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(0));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(1), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(2), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(3), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(4), 
is(MISSING_SEGMENT_FILE_OFFSET));
     }
 
     @Test
@@ -225,7 +227,7 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
 
         memTable.truncateSuffix(0, 0);
 
-        assertThat(memTable.segmentInfo(0).getOffset(1), is(0));
+        assertThat(memTable.segmentInfo(0).getOffset(1), 
is(MISSING_SEGMENT_FILE_OFFSET));
 
         memTable.appendSegmentFileOffset(0, 1, 43);
 
@@ -233,19 +235,122 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
     }
 
     @Test
-    void testTruncateIntoThePast() {
+    void testTruncateSuffixIntoThePast() {
         memTable.appendSegmentFileOffset(0, 36, 42);
 
         // Truncate to a position before the moment the last segment info was 
added.
         memTable.truncateSuffix(0, 10);
 
-        assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
-        assertThat(memTable.segmentInfo(0).getOffset(11), is(0));
+        assertThat(memTable.segmentInfo(0).getOffset(36), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(0).getOffset(11), 
is(MISSING_SEGMENT_FILE_OFFSET));
 
         memTable.appendSegmentFileOffset(0, 11, 43);
 
         assertThat(memTable.segmentInfo(0).getOffset(11), is(43));
-        assertThat(memTable.segmentInfo(0).getOffset(12), is(0));
-        assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
+        assertThat(memTable.segmentInfo(0).getOffset(12), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(0).getOffset(36), 
is(MISSING_SEGMENT_FILE_OFFSET));
+    }
+
+    @Test
+    void testTruncatePrefix() {
+        long groupId1 = 1;
+        long groupId2 = 2;
+
+        memTable.appendSegmentFileOffset(groupId1, 1, 42);
+        memTable.appendSegmentFileOffset(groupId1, 2, 43);
+        memTable.appendSegmentFileOffset(groupId1, 3, 44);
+        memTable.appendSegmentFileOffset(groupId1, 4, 45);
+
+        memTable.appendSegmentFileOffset(groupId2, 1, 55);
+        memTable.appendSegmentFileOffset(groupId2, 2, 56);
+        memTable.appendSegmentFileOffset(groupId2, 3, 57);
+        memTable.appendSegmentFileOffset(groupId2, 4, 58);
+
+        memTable.truncatePrefix(groupId1, 1);
+
+        assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(42));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(43));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(44));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(45));
+
+        assertThat(memTable.segmentInfo(groupId2).getOffset(1), is(55));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(2), is(56));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(3), is(57));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(4), is(58));
+
+        memTable.truncatePrefix(groupId2, 3);
+
+        assertThat(memTable.segmentInfo(groupId1).getOffset(1), is(42));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(2), is(43));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(3), is(44));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(45));
+
+        assertThat(memTable.segmentInfo(groupId2).getOffset(1), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(2), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(3), is(57));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(4), is(58));
+
+        memTable.truncatePrefix(groupId1, 4);
+
+        assertThat(memTable.segmentInfo(groupId1).getOffset(1), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(2), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(3), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId1).getOffset(4), is(45));
+
+        assertThat(memTable.segmentInfo(groupId2).getOffset(1), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(2), 
is(MISSING_SEGMENT_FILE_OFFSET));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(3), is(57));
+        assertThat(memTable.segmentInfo(groupId2).getOffset(4), is(58));
+    }
+
+    @Test
+    void testTruncateNonExistingPrefix() {
+        assertDoesNotThrow(() -> memTable.truncatePrefix(0, 4));
+
+        memTable.appendSegmentFileOffset(1, 5, 42);
+
+        assertDoesNotThrow(() -> memTable.truncatePrefix(1, 4));
+        assertThrows(IllegalArgumentException.class, () -> 
memTable.truncatePrefix(1, 10));
+    }
+
+    @Test
+    void testPrefixAndSuffixTombstones() {
+        memTable.truncatePrefix(0, 10);
+
+        memTable.truncateSuffix(0, 15);
+
+        memTable.appendSegmentFileOffset(0, 16, 42);
+
+        SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+        assertThat(segmentInfo, is(notNullValue()));
+        assertThat(segmentInfo.getOffset(16), is(42));
+    }
+
+    @Test
+    void testSuffixAndPrefixTombstones() {
+        memTable.truncateSuffix(0, 15);
+
+        memTable.truncatePrefix(0, 10);
+
+        memTable.appendSegmentFileOffset(0, 16, 42);
+
+        SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+        assertThat(segmentInfo, is(notNullValue()));
+        assertThat(segmentInfo.getOffset(16), is(42));
+    }
+
+    @Test
+    void testMultiplePrefixTombstones() {
+        memTable.truncatePrefix(0, 10);
+
+        memTable.truncatePrefix(0, 15);
+
+        SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+        assertThat(segmentInfo, is(notNullValue()));
+        assertThat(segmentInfo.isPrefixTombstone(), is(true));
+        assertThat(segmentInfo.firstIndexKept(), is(15L));
     }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index 4ef20956321..f3bd0972190 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -23,9 +23,8 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -38,6 +37,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
+import 
org.apache.ignite.internal.raft.storage.segstore.EntrySearchResult.SearchOutcome;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
@@ -110,7 +110,7 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
     @Test
     void testReadFromQueue() {
         // Read from empty queue.
-        assertThat(checkpointer.findSegmentPayloadInQueue(0, 0), 
is(nullValue()));
+        assertThat(checkpointer.findSegmentPayloadInQueue(0, 
0).searchOutcome(), is(SearchOutcome.CONTINUE_SEARCH));
 
         var blockFuture = new CompletableFuture<Void>();
 
@@ -137,22 +137,84 @@ class RaftLogCheckpointerTest extends 
BaseIgniteAbstractTest {
 
             for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) {
                 for (int logIndex = 0; logIndex < MAX_QUEUE_SIZE; logIndex++) {
-                    ByteBuffer payload = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+                    EntrySearchResult searchResult = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
 
                     if (groupId == logIndex) {
-                        assertThat(payload, is(notNullValue()));
+                        assertThat(searchResult.searchOutcome(), 
is(SearchOutcome.SUCCESS));
                     } else {
-                        assertThat(payload, is(nullValue()));
+                        assertThat(searchResult.searchOutcome(), 
anyOf(is(SearchOutcome.CONTINUE_SEARCH), is(SearchOutcome.NOT_FOUND)));
                     }
                 }
             }
 
-            assertThat(checkpointer.findSegmentPayloadInQueue(MAX_QUEUE_SIZE, 
MAX_QUEUE_SIZE), is(nullValue()));
+            assertThat(
+                    checkpointer.findSegmentPayloadInQueue(MAX_QUEUE_SIZE, 
MAX_QUEUE_SIZE).searchOutcome(),
+                    is(SearchOutcome.CONTINUE_SEARCH)
+            );
         } finally {
             blockFuture.complete(null);
         }
 
         // The queue should eventually become empty again.
-        await().until(() -> checkpointer.findSegmentPayloadInQueue(0, 0), 
is(nullValue()));
+        await().until(() -> checkpointer.findSegmentPayloadInQueue(0, 
0).searchOutcome(), is(SearchOutcome.CONTINUE_SEARCH));
+    }
+
+    @Test
+    void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock 
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+        var blockFuture = new CompletableFuture<Void>();
+
+        try {
+            doAnswer(invocation -> blockFuture.join()).when(mockFile).sync();
+
+            ByteBuffer buffer = ByteBuffer.allocate(16);
+
+            when(mockFile.buffer()).thenReturn(buffer);
+
+            long groupId = 2;
+            long logIndex = 5;
+
+            var segmentInfo = new SegmentInfo(1);
+
+            for (int i = 1; i <= 10; i++) {
+                segmentInfo.addOffset(i, i);
+            }
+
+            when(mockMemTable.segmentInfo(groupId)).thenReturn(segmentInfo);
+
+            checkpointer.onRollover(mockFile, mockMemTable);
+
+            EntrySearchResult res = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+
+            assertThat(res.searchOutcome(), is(SearchOutcome.SUCCESS));
+            assertThat(res.entryBuffer(), is(buffer));
+        } finally {
+            blockFuture.complete(null);
+        }
+    }
+
+    @Test
+    void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock 
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+        var blockFuture = new CompletableFuture<Void>();
+
+        try {
+            doAnswer(invocation -> blockFuture.join()).when(mockFile).sync();
+
+            long groupId = 2;
+
+            SegmentInfo mockSegmentInfo = mock(SegmentInfo.class);
+
+            
when(mockMemTable.segmentInfo(groupId)).thenReturn(mockSegmentInfo);
+            when(mockSegmentInfo.lastLogIndexExclusive()).thenReturn(20L);
+            // Emulate prefix truncation from index 10.
+            when(mockSegmentInfo.firstIndexKept()).thenReturn(10L);
+
+            checkpointer.onRollover(mockFile, mockMemTable);
+
+            EntrySearchResult res = 
checkpointer.findSegmentPayloadInQueue(groupId, 5);
+
+            assertThat(res.searchOutcome(), is(SearchOutcome.NOT_FOUND));
+        } finally {
+            blockFuture.complete(null);
+        }
     }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index f2099afb89f..3ec7422d464 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -76,6 +76,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 @ExtendWith(ExecutorServiceExtension.class)
 class SegmentFileManagerTest extends IgniteAbstractTest {
@@ -375,6 +377,30 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
         }
     }
 
+    @Test
+    void truncateRecordIsWrittenOnPrefixTruncate() throws IOException {
+        long groupId = 36;
+
+        long firstLogIndexKept = 42;
+
+        fileManager.truncatePrefix(groupId, firstLogIndexKept);
+
+        Path path = findSoleSegmentFile();
+
+        ByteBuffer expectedTruncateRecord = 
ByteBuffer.allocate(SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE)
+                .order(SegmentFile.BYTE_ORDER);
+
+        SegmentPayload.writeTruncatePrefixRecordTo(expectedTruncateRecord, 
groupId, firstLogIndexKept);
+
+        expectedTruncateRecord.rewind();
+
+        try (SeekableByteChannel channel = Files.newByteChannel(path)) {
+            channel.position(HEADER_RECORD.length);
+
+            assertThat(readFully(channel, 
SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE), is(expectedTruncateRecord));
+        }
+    }
+
     @Test
     void testRecovery() throws Exception {
         int batchSize = FILE_SIZE / 4;
@@ -459,8 +485,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
         assertThat(indexFiles(), hasSize(1));
     }
 
-    @Test
-    void testRecoveryWithTruncateSuffix() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testTruncateSuffix(boolean restart) throws Exception {
         List<byte[]> batches = randomData(FILE_SIZE / 4, 10);
 
         for (int i = 0; i < batches.size(); i++) {
@@ -481,15 +508,17 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
 
         fileManager.truncateSuffix(GROUP_ID, lastLogIndexKept);
 
-        fileManager.close();
+        if (restart) {
+            fileManager.close();
 
-        for (Path indexFile : indexFiles()) {
-            Files.deleteIfExists(indexFile);
-        }
+            for (Path indexFile : indexFiles()) {
+                Files.deleteIfExists(indexFile);
+            }
 
-        fileManager = createFileManager();
+            fileManager = createFileManager();
 
-        fileManager.start();
+            fileManager.start();
+        }
 
         for (int i = 0; i <= lastLogIndexKept; i++) {
             byte[] expectedEntry = batches.get(i);
@@ -508,6 +537,55 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testTruncatePrefix(boolean restart) throws Exception {
+        List<byte[]> batches = randomData(FILE_SIZE / 4, 10);
+
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(batches.get(i), i);
+        }
+
+        await().until(this::indexFiles, hasSize(4));
+
+        int firstLogIndexKept = batches.size() / 2;
+
+        fileManager.truncatePrefix(GROUP_ID, firstLogIndexKept);
+
+        // Insert more data, just in case.
+        for (int i = 0; i < batches.size(); i++) {
+            appendBytes(batches.get(i), i + batches.size());
+        }
+
+        if (restart) {
+            fileManager.close();
+
+            for (Path indexFile : indexFiles()) {
+                Files.deleteIfExists(indexFile);
+            }
+
+            fileManager = createFileManager();
+
+            fileManager.start();
+        }
+
+        for (int i = 0; i < batches.size() - firstLogIndexKept; i++) {
+            byte[] expectedEntry = batches.get(firstLogIndexKept + i);
+
+            fileManager.getEntry(GROUP_ID, i, bs -> {
+                assertThat(bs, is(expectedEntry));
+
+                return null;
+            });
+        }
+
+        for (int i = 0; i < firstLogIndexKept; i++) {
+            fileManager.getEntry(GROUP_ID, i, bs -> {
+                throw new AssertionError("This method should not be called.");
+            });
+        }
+    }
+
     private Path findSoleSegmentFile() throws IOException {
         List<Path> segmentFiles = segmentFiles();
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index aff053d6236..7d10cefdc54 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -133,4 +133,44 @@ class SegstoreLogStorageConcurrencyTest extends 
IgniteAbstractTest {
 
         runRace(writerTaskFactory.apply(1), writerTaskFactory.apply(2));
     }
+
+    @Test
+    void testFirstIndexAfterTruncatePrefix() {
+        List<LogEntry> entries = TestUtils.mockEntries();
+
+        IntFunction<RunnableX> writerTaskFactory = groupId -> () -> {
+            SegstoreLogStorage logStorage = newLogStorage(groupId);
+
+            try {
+                assertThat(logStorage.getFirstLogIndex(), is(1L));
+                assertThat(logStorage.getLastLogIndex(), is(0L));
+
+                long firstLogIndex = 0;
+
+                for (int i = 0; i < entries.size(); i++) {
+                    LogEntry entry = entries.get(i);
+
+                    logStorage.appendEntry(entry);
+
+                    long logIndex = entry.getId().getIndex();
+
+                    if (i > 0 && i % 10 == 0) {
+                        logStorage.truncatePrefix(logIndex);
+
+                        firstLogIndex = logIndex;
+                    }
+
+                    assertThat(logStorage.getFirstLogIndex(), 
is(firstLogIndex));
+                    assertThat(logStorage.getLastLogIndex(), is(logIndex));
+                }
+
+                assertThat(logStorage.getFirstLogIndex(), is(firstLogIndex));
+                assertThat(logStorage.getLastLogIndex(), is((long) 
entries.size() - 1));
+            } finally {
+                logStorage.shutdown();
+            }
+        };
+
+        runRace(writerTaskFactory.apply(1), writerTaskFactory.apply(2));
+    }
 }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 0a7d28db343..34d4cc00f8f 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -66,12 +66,6 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
         super.testReset();
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26285";)
-    @Override
-    public void testTruncatePrefix() {
-        super.testTruncatePrefix();
-    }
-
     @ParameterizedTest
     // Number of entries is chosen to test scenarios with zero index files and 
with multiple index files.
     @ValueSource(ints = { 15, 100_000 })
@@ -97,6 +91,9 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
 
         logStorage.truncateSuffix(lastIndexKept);
 
+        assertThat(logStorage.getFirstLogIndex(), is(0L));
+        assertThat(logStorage.getLastLogIndex(), is(lastIndexKept));
+
         logStorage.shutdown();
         segmentFileManager.close();
 
@@ -106,4 +103,26 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
         assertThat(logStorage.getFirstLogIndex(), is(0L));
         assertThat(logStorage.getLastLogIndex(), is(lastIndexKept));
     }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 15, 100_000 })
+    public void firstAndLastLogIndexAfterPrefixTruncateAndRestart(int 
numEntries) throws Exception {
+        logStorage.appendEntries(TestUtils.mockEntries(numEntries));
+
+        long firstIndexKept = numEntries / 2;
+
+        logStorage.truncatePrefix(firstIndexKept);
+
+        assertThat(logStorage.getFirstLogIndex(), is(firstIndexKept));
+        assertThat(logStorage.getLastLogIndex(), is((long) numEntries - 1));
+
+        logStorage.shutdown();
+        segmentFileManager.close();
+
+        logStorage = newLogStorage();
+        logStorage.init(newLogStorageOptions());
+
+        assertThat(logStorage.getFirstLogIndex(), is(firstIndexKept));
+        assertThat(logStorage.getLastLogIndex(), is((long) numEntries - 1));
+    }
 }

Reply via email to