rpuch commented on code in PR #7101:
URL: https://github.com/apache/ignite-3/pull/7101#discussion_r2589240695


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.nio.ByteBuffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class representing the result of an entry search in the log storage.
+ *
+ * <p>It is used to represent three conditions:
+ *
+ * <ol>
+ *     <li>If the corresponding method returns {@code null}, then the entry 
was not found and we may want to continue

Review Comment:
   How about returning a non-null marker for 'notOnThisLevel` instead of a null 
as well? It will make the code more readable. And the other option (`isEmpty`) 
could be renamed to something like `doesNotExist`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -64,14 +65,16 @@
  * +------------------------------------------------------------------+
  * </pre>
  *
- * <p>Raft group meta is as follows:
- * <pre>
- * 
+------------------------------------------------------------------------------------------------------------------------+-----+
- * |                                              Raft group 1 meta            
                                             | ... |
- * 
+------------------------------------------------------------------------------------------------------------------------+-----+
- * | Group ID (8 bytes) | Flags (4 bytes) | Payload Offset (4 bytes) | First 
Log Index (8 bytes) | Last Log Index (8 bytes) | ... |
- * 
+------------------------------------------------------------------------------------------------------------------------+-----+
- * </pre>
+ * <p>Each Raft group meta is as follows (written as a list, because the table 
doesn't fit the configured line length):
+ * <ol>
+ *     <li>Group ID (8 bytes);</li>
+ *     <li>Flags (4 bytes);</li>
+ *     <li>Payload offset (4 bytes);</li>
+ *     <li>First log index (8 bytes);</li>
+ *     <li>Last log index (8 bytes);</li>

Review Comment:
   Exclusive?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java:
##########
@@ -117,9 +136,43 @@ IndexFileMeta indexMeta(long logIndex) {
         return null;
     }
 
+    /**
+     * Removes all index metas that have log indices smaller than the given 
value.
+     */
+    void truncatePrefix(long firstLogIndexKept) {
+        Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator();

Review Comment:
   Please add a comment to `fileMetaDeque` to describe what its elements 
represent and in which order they are kept



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java:
##########
@@ -104,22 +104,34 @@ void onRollover(SegmentFile segmentFile, 
ReadModeIndexMemTable indexMemTable) {
 
     /**
      * Searches for the segment payload corresponding to the given Raft Group 
ID and Raft Log Index in the checkpoint queue.
-     *
-     * @return {@code ByteBuffer} which position is set to the start of the 
corresponding segment payload or {@code null} if the payload has
-     *         not been found in all files currently present in the queue.
      */
-    @Nullable ByteBuffer findSegmentPayloadInQueue(long groupId, long 
logIndex) {
+    @Nullable EntrySearchResult findSegmentPayloadInQueue(long groupId, long 
logIndex) {
         Iterator<Entry> it = queue.tailIterator();
 
         while (it.hasNext()) {
             Entry e = it.next();
 
             SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId);
 
-            int segmentPayloadOffset = segmentInfo == null ? 0 : 
segmentInfo.getOffset(logIndex);
+            if (segmentInfo == null) {
+                continue;
+            }
+
+            if (segmentInfo.lastLogIndexExclusive() <= logIndex) {
+                return EntrySearchResult.empty();
+            }
+
+            if (segmentInfo.firstIndexKept() > logIndex) {
+                // This is a prefix tombstone and it cuts off the log index we 
search for.
+                return EntrySearchResult.empty();
+            }
+
+            int segmentPayloadOffset = segmentInfo.getOffset(logIndex);
 
             if (segmentPayloadOffset != 0) {

Review Comment:
   How about introducing a constant for 0, called `INDEX_NOT_IN_SEGMENT`?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -56,13 +58,23 @@ ArrayWithSize add(int element) {
             return new ArrayWithSize(array, size + 1);
         }
 
-        ArrayWithSize truncate(int newSize) {
+        ArrayWithSize truncateSuffix(int newSize) {
+            return truncate(0, newSize);
+        }
+
+        ArrayWithSize truncatePrefix(int newSize) {
+            int srcPos = size - newSize;
+
+            return truncate(srcPos, newSize);
+        }
+
+        private ArrayWithSize truncate(int srcPos, int newSize) {
             assert newSize <= size
                     : String.format("Array must shrink on truncation, current 
size: %d, size after truncation: %d", size, newSize);
 
-            int[] newArray = new int[size];
+            int[] newArray = new int[array.length];

Review Comment:
   Do we allocate an array of the same size? Why?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java:
##########
@@ -63,23 +69,40 @@ public void appendSegmentFileOffset(long groupId, long 
logIndex, int segmentFile
 
     @Override
     public SegmentInfo segmentInfo(long groupId) {
-        return stripe(groupId).memTable.get(groupId);
+        return memtable(groupId).get(groupId);
     }
 
     @Override
     public void truncateSuffix(long groupId, long lastLogIndexKept) {
-        ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable;
-
-        SegmentInfo segmentInfo = memtable.get(groupId);
+        ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+
+        memtable.compute(groupId, (id, segmentInfo) -> {
+            if (segmentInfo == null || lastLogIndexKept < 
segmentInfo.firstLogIndexInclusive()) {
+                // If the current memtable does not have information for the 
given group or if we are truncating everything currently
+                // present in the memtable, we need to write a special "empty" 
SegmentInfo into the memtable to override existing persisted
+                // data during search.
+                return new SegmentInfo(lastLogIndexKept + 1);

Review Comment:
   Why is 1 added here?



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java:
##########
@@ -248,4 +249,107 @@ void testTruncateIntoThePast() {
         assertThat(memTable.segmentInfo(0).getOffset(12), is(0));
         assertThat(memTable.segmentInfo(0).getOffset(36), is(0));
     }
+
+    @Test
+    void testTruncatePrefix() {
+        long groupId0 = 1;
+        long groupId1 = 2;

Review Comment:
   ```suggestion
           long groupId1 = 1;
           long groupId2 = 2;
   ```



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java:
##########
@@ -106,4 +103,26 @@ public void 
firstAndLastLogIndexAfterSuffixTruncateAndRestart(int numEntries) th
         assertThat(logStorage.getFirstLogIndex(), is(0L));
         assertThat(logStorage.getLastLogIndex(), is(lastIndexKept));
     }
+
+    @ParameterizedTest
+    @ValueSource(ints = { 15, 100_000 })
+    public void firstAndLastLogIndexAfterPrefixTruncateAndRestart(int 
numEntries) throws Exception {
+        logStorage.appendEntries(TestUtils.mockEntries(numEntries));
+
+        long lastIndexKept = numEntries / 2;

Review Comment:
   ```suggestion
           long firstIndexKept = numEntries / 2;
   ```



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java:
##########
@@ -137,12 +137,12 @@ void testReadFromQueue() {
 
             for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) {
                 for (int logIndex = 0; logIndex < MAX_QUEUE_SIZE; logIndex++) {
-                    ByteBuffer payload = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+                    EntrySearchResult searchResult = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
 
                     if (groupId == logIndex) {
-                        assertThat(payload, is(notNullValue()));
+                        assertThat(searchResult != null && 
!searchResult.isEmpty(), is(true));

Review Comment:
   Let's split this into 2 assertions



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -533,13 +531,21 @@ private static boolean endOfSegmentReached(ByteBuffer 
buffer) {
      * possibly incomplete segment file.
      */
     private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile 
segmentFile, Path segmentFilePath) {
+        return recoverMemtable(segmentFile, segmentFilePath, true);
+    }
+
+    private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, 
Path segmentFilePath) {
+        return recoverMemtable(segmentFile, segmentFilePath, false);

Review Comment:
   CRC is not validated here as this is not the latest segment and we are sure 
it's fully synced, right? Please add a comment about this



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.nio.ByteBuffer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class representing the result of an entry search in the log storage.
+ *
+ * <p>It is used to represent three conditions:
+ *
+ * <ol>
+ *     <li>If the corresponding method returns {@code null}, then the entry 
was not found and we may want to continue
+ *     looking in other places;</li>
+ *     <li>If the method returns an {@link #isEmpty} instance, then the entry 
was not found but we know for sure that it does
+ *     not exist in the storage;</li>
+ *     <li>If the method returns a non-empty instance, then the corresponding 
entry has been found successfully.</li>
+ * </ol>
+ */
+class EntrySearchResult {
+    private static final EntrySearchResult EMPTY = new EntrySearchResult();
+
+    @Nullable
+    private final ByteBuffer entryBuffer;
+
+    @SuppressWarnings("NullableProblems") // We don't want to accept nulls 
here.

Review Comment:
   So what are we suppressing if we don't want to accept nulls?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java:
##########
@@ -106,10 +114,50 @@ IndexFileMeta find(long logIndex) {
             } else if (logIndex >= midValue.lastLogIndexExclusive()) {
                 lowArrayIndex = middleArrayIndex + 1;
             } else {
-                return midValue;
+                return middleArrayIndex;
             }
         }
 
-        return null;
+        return -1;
+    }
+
+    IndexFileMetaArray truncateIndicesSmallerThan(long firstLogIndexKept) {
+        int firstLogIndexKeptArrayIndex = findArrayIndex(firstLogIndexKept);
+
+        assert firstLogIndexKeptArrayIndex >= 0 : String.format(
+                "Missing entry for log index %d in range [%d:%d).",
+                firstLogIndexKept, firstLogIndexInclusive(), 
lastLogIndexExclusive()
+        );
+
+        IndexFileMeta metaToUpdate = array[firstLogIndexKeptArrayIndex];
+
+        int numEntriesToSkip = toIntExact(firstLogIndexKept - 
metaToUpdate.firstLogIndexInclusive());
+
+        assert numEntriesToSkip >= 0 : String.format(
+                "Trying to do a no-op prefix truncate from index %d in range 
[%d:%d).",
+                firstLogIndexKept, firstLogIndexInclusive(), 
lastLogIndexExclusive()
+        );
+
+        // Move the payload offset pointer to skip truncated entries (each 
entry is 4 bytes).
+        int adjustedPayloadOffset = metaToUpdate.indexFilePayloadOffset() + 
numEntriesToSkip * Integer.BYTES;
+
+        var trimmedMeta = new IndexFileMeta(
+                firstLogIndexKept,
+                metaToUpdate.lastLogIndexExclusive(),
+                adjustedPayloadOffset,
+                metaToUpdate.indexFileOrdinal()
+        );
+
+        IndexFileMeta[] newArray = new IndexFileMeta[array.length];
+
+        newArray[0] = trimmedMeta;
+
+        int newSize = size - firstLogIndexKeptArrayIndex;
+
+        if (newSize > 1) {
+            System.arraycopy(array, firstLogIndexKeptArrayIndex + 1, newArray, 
1, newSize - 1);

Review Comment:
   ```suggestion
           if (newSize > 1) {
               // Copy elements following the one hosting firstLogIndexKept 
(which becomes the first one).
               System.arraycopy(array, firstLogIndexKeptArrayIndex + 1, 
newArray, 1, newSize - 1);
   ```



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -293,33 +301,76 @@ private byte[] 
serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl
 
             long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
 
+            long firstIndexKept = segmentInfo.firstIndexKept();
+
             // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
             if (!onRecovery) {
-                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+                IndexFileMeta indexFileMeta = createIndexFileMeta(
+                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileOrdinal
+                );
 
-                putIndexFileMeta(groupId, indexFileMeta);
+                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
             }
 
             headerBuffer
                     .putLong(groupId)
                     .putInt(0) // Flags.
                     .putInt(payloadOffset)
                     .putLong(firstLogIndexInclusive)
-                    .putLong(lastLogIndexExclusive);
+                    .putLong(lastLogIndexExclusive)
+                    .putLong(firstIndexKept);
 
             payloadOffset += payloadSize(segmentInfo);
         }
 
         return headerBuffer.array();
     }
 
-    private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+    private static @Nullable IndexFileMeta createIndexFileMeta(
+            long firstLogIndexInclusive,
+            long lastLogIndexExclusive,
+            long firstIndexKept,
+            int payloadOffset,
+            int fileOrdinal
+    ) {
+        if (firstLogIndexInclusive == -1) {
+            assert firstIndexKept != -1 : "Expected a prefix tombstone, but 
firstIndexKept is not set.";

Review Comment:
   Let's create a constant called `PREFIX_TOMBSTONE`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -293,33 +301,76 @@ private byte[] 
serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl
 
             long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
 
+            long firstIndexKept = segmentInfo.firstIndexKept();
+
             // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
             if (!onRecovery) {
-                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+                IndexFileMeta indexFileMeta = createIndexFileMeta(
+                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileOrdinal
+                );
 
-                putIndexFileMeta(groupId, indexFileMeta);
+                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
             }
 
             headerBuffer
                     .putLong(groupId)
                     .putInt(0) // Flags.
                     .putInt(payloadOffset)
                     .putLong(firstLogIndexInclusive)
-                    .putLong(lastLogIndexExclusive);
+                    .putLong(lastLogIndexExclusive)
+                    .putLong(firstIndexKept);
 
             payloadOffset += payloadSize(segmentInfo);
         }
 
         return headerBuffer.array();
     }
 
-    private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+    private static @Nullable IndexFileMeta createIndexFileMeta(
+            long firstLogIndexInclusive,
+            long lastLogIndexExclusive,
+            long firstIndexKept,
+            int payloadOffset,
+            int fileOrdinal
+    ) {
+        if (firstLogIndexInclusive == -1) {
+            assert firstIndexKept != -1 : "Expected a prefix tombstone, but 
firstIndexKept is not set.";
+
+            // This is a "prefix tombstone", no need to create any meta, we 
will just truncate the prefix.
+            return null;
+        }
+
+        if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
+            // No prefix truncation required, simply create a new meta.
+            return new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+        }
+
+        // Create a meta with a truncated prefix.
+        int numEntriesToSkip = toIntExact(firstIndexKept - 
firstLogIndexInclusive);
+
+        int adjustedPayloadOffset = payloadOffset + numEntriesToSkip * 
Integer.BYTES;
+
+        return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive, 
adjustedPayloadOffset, fileOrdinal);
+    }
+
+    private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta 
indexFileMeta, long firstIndexKept) {
         GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
 
         if (existingGroupIndexMeta == null) {
-            groupIndexMetas.put(groupId, new GroupIndexMeta(indexFileMeta));
+            if (indexFileMeta != null) {
+                groupIndexMetas.put(groupId, new 
GroupIndexMeta(indexFileMeta));
+            }
         } else {
-            existingGroupIndexMeta.addIndexMeta(indexFileMeta);
+            if (firstIndexKept != -1) {
+                existingGroupIndexMeta.truncatePrefix(firstIndexKept);
+            }
+
+            if (indexFileMeta != null) {
+                // New index meta must have already been truncated according 
to the prefix tombstone.

Review Comment:
   Why is it 'according to...'?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java:
##########
@@ -106,10 +114,50 @@ IndexFileMeta find(long logIndex) {
             } else if (logIndex >= midValue.lastLogIndexExclusive()) {
                 lowArrayIndex = middleArrayIndex + 1;
             } else {
-                return midValue;
+                return middleArrayIndex;
             }
         }
 
-        return null;
+        return -1;
+    }
+
+    IndexFileMetaArray truncateIndicesSmallerThan(long firstLogIndexKept) {
+        int firstLogIndexKeptArrayIndex = findArrayIndex(firstLogIndexKept);
+
+        assert firstLogIndexKeptArrayIndex >= 0 : String.format(
+                "Missing entry for log index %d in range [%d:%d).",
+                firstLogIndexKept, firstLogIndexInclusive(), 
lastLogIndexExclusive()
+        );
+
+        IndexFileMeta metaToUpdate = array[firstLogIndexKeptArrayIndex];
+
+        int numEntriesToSkip = toIntExact(firstLogIndexKept - 
metaToUpdate.firstLogIndexInclusive());
+
+        assert numEntriesToSkip >= 0 : String.format(
+                "Trying to do a no-op prefix truncate from index %d in range 
[%d:%d).",
+                firstLogIndexKept, firstLogIndexInclusive(), 
lastLogIndexExclusive()
+        );
+
+        // Move the payload offset pointer to skip truncated entries (each 
entry is 4 bytes).
+        int adjustedPayloadOffset = metaToUpdate.indexFilePayloadOffset() + 
numEntriesToSkip * Integer.BYTES;

Review Comment:
   Are those 4 bytes the same 4 bytes used in `IndexFileManager`? Anyway, let's 
use a constant with a descriptive name



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java:
##########
@@ -104,22 +104,34 @@ void onRollover(SegmentFile segmentFile, 
ReadModeIndexMemTable indexMemTable) {
 
     /**
      * Searches for the segment payload corresponding to the given Raft Group 
ID and Raft Log Index in the checkpoint queue.
-     *
-     * @return {@code ByteBuffer} which position is set to the start of the 
corresponding segment payload or {@code null} if the payload has
-     *         not been found in all files currently present in the queue.
      */
-    @Nullable ByteBuffer findSegmentPayloadInQueue(long groupId, long 
logIndex) {
+    @Nullable EntrySearchResult findSegmentPayloadInQueue(long groupId, long 
logIndex) {
         Iterator<Entry> it = queue.tailIterator();
 
         while (it.hasNext()) {
             Entry e = it.next();
 
             SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId);
 
-            int segmentPayloadOffset = segmentInfo == null ? 0 : 
segmentInfo.getOffset(logIndex);
+            if (segmentInfo == null) {
+                continue;
+            }
+
+            if (segmentInfo.lastLogIndexExclusive() <= logIndex) {

Review Comment:
   Would it make sense to flip this and the following comparisons, butting 
`logIndex` on the left? It seems more natural to put 'the thing that we are 
checking' on the left and the boundary on the right



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -293,33 +301,76 @@ private byte[] 
serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl
 
             long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
 
+            long firstIndexKept = segmentInfo.firstIndexKept();
+
             // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
             if (!onRecovery) {
-                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+                IndexFileMeta indexFileMeta = createIndexFileMeta(
+                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileOrdinal
+                );
 
-                putIndexFileMeta(groupId, indexFileMeta);
+                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
             }
 
             headerBuffer
                     .putLong(groupId)
                     .putInt(0) // Flags.
                     .putInt(payloadOffset)
                     .putLong(firstLogIndexInclusive)
-                    .putLong(lastLogIndexExclusive);
+                    .putLong(lastLogIndexExclusive)
+                    .putLong(firstIndexKept);
 
             payloadOffset += payloadSize(segmentInfo);
         }
 
         return headerBuffer.array();
     }
 
-    private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+    private static @Nullable IndexFileMeta createIndexFileMeta(
+            long firstLogIndexInclusive,
+            long lastLogIndexExclusive,
+            long firstIndexKept,
+            int payloadOffset,
+            int fileOrdinal
+    ) {
+        if (firstLogIndexInclusive == -1) {
+            assert firstIndexKept != -1 : "Expected a prefix tombstone, but 
firstIndexKept is not set.";
+
+            // This is a "prefix tombstone", no need to create any meta, we 
will just truncate the prefix.
+            return null;
+        }
+
+        if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) {
+            // No prefix truncation required, simply create a new meta.
+            return new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+        }
+
+        // Create a meta with a truncated prefix.
+        int numEntriesToSkip = toIntExact(firstIndexKept - 
firstLogIndexInclusive);
+
+        int adjustedPayloadOffset = payloadOffset + numEntriesToSkip * 
Integer.BYTES;

Review Comment:
   Let's create a constant for Integer.BYTES here



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -556,11 +562,24 @@ private WriteModeIndexMemTable 
recoverLatestMemtable(SegmentFile segmentFile, Pa
                 buffer.position(segmentFilePayloadOffset);
 
                 // CRC violation signals the end of meaningful data in the 
segment file.
-                if (!isCrcValid(buffer, crcPosition)) {
+                if (validateCrc && !isCrcValid(buffer, crcPosition)) {

Review Comment:
   This block is duplicated for each of 3 entry types. How about introducing 
the entry type explicitly in code? This would help to have the recovery code 
nicely packaged in 3 classes; this will probably allow to get rid of this 
duplication and also make the object model more obvious



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -337,17 +364,22 @@ private WriteBufferWithMemtable 
reserveBytesWithRollover(int size) throws IOExce
      * storage, not taking pending in-memory state into account.
      */
     long firstLogIndexInclusiveOnRecovery(long groupId) {
+        SegmentFileWithMemtable currentSegmentFile = 
this.currentSegmentFile.get();
+
+        SegmentInfo segmentInfo = 
currentSegmentFile.memtable().segmentInfo(groupId);
+
+        // We need to consult with the latest memtable in case it contains a 
prefix tombstone.
+        if (segmentInfo != null && segmentInfo.firstIndexKept() != -1) {
+            return segmentInfo.firstIndexKept();
+        }
+
         long firstLogIndexFromIndexStorage = 
indexFileManager.firstLogIndexInclusive(groupId);

Review Comment:
   Why do we look at the index if we have the latest memtable?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -168,29 +201,74 @@ int size() {
     void saveOffsetsTo(ByteBuffer buffer) {
         ArrayWithSize offsets = segmentFileOffsets;
 
+        assert offsets.size() > 0 : "Offsets array must not be empty";
+
         buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
     }
 
     /**
      * Removes all data which log indices are strictly greater than {@code 
lastLogIndexKept}.
      */
-    void truncateSuffix(long lastLogIndexKept) {
+    SegmentInfo truncateSuffix(long lastLogIndexKept) {
         assert lastLogIndexKept >= logIndexBase : 
String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, 
lastLogIndexKept);
 
         ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
 
-        long newSize = lastLogIndexKept - logIndexBase + 1;
+        long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
 
-        // Not using an assertion here, because this value comes doesn't come 
from the storage code.
-        if (newSize > segmentFileOffsets.size()) {
+        // Not using an assertion here, because this value doesn't come from 
the storage code.
+        if (lastLogIndexKept >= lastLogIndexExclusive) {
             throw new IllegalArgumentException(String.format(
                     "lastLogIndexKept is too large. Last index in memtable: 
%d, lastLogIndexKept: %d",
-                    logIndexBase + segmentFileOffsets.size() - 1, 
lastLogIndexKept
+                    lastLogIndexExclusive - 1, lastLogIndexKept
             ));
         }
 
-        ArrayWithSize newSegmentFileOffsets = 
segmentFileOffsets.truncate((int) newSize);
+        int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1);
+
+        setSegmentFileOffsets(segmentFileOffsets, 
segmentFileOffsets.truncateSuffix(newSize));
+
+        // This could have been a "void" method, but this way it looks 
consistent with "truncatePrefix".
+        return this;
+    }
+
+    /**
+     * Removes all data which log indices are strictly smaller than {@code 
firstIndexKept}.
+     */
+    SegmentInfo truncatePrefix(long firstIndexKept) {
+        if (isPrefixTombstone()) {
+            if (this.firstIndexKept >= firstIndexKept) {

Review Comment:
   Let's flip this comparison



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -168,29 +201,74 @@ int size() {
     void saveOffsetsTo(ByteBuffer buffer) {
         ArrayWithSize offsets = segmentFileOffsets;
 
+        assert offsets.size() > 0 : "Offsets array must not be empty";
+
         buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
     }
 
     /**
      * Removes all data which log indices are strictly greater than {@code 
lastLogIndexKept}.
      */
-    void truncateSuffix(long lastLogIndexKept) {
+    SegmentInfo truncateSuffix(long lastLogIndexKept) {
         assert lastLogIndexKept >= logIndexBase : 
String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, 
lastLogIndexKept);
 
         ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
 
-        long newSize = lastLogIndexKept - logIndexBase + 1;
+        long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
 
-        // Not using an assertion here, because this value comes doesn't come 
from the storage code.
-        if (newSize > segmentFileOffsets.size()) {
+        // Not using an assertion here, because this value doesn't come from 
the storage code.
+        if (lastLogIndexKept >= lastLogIndexExclusive) {
             throw new IllegalArgumentException(String.format(
                     "lastLogIndexKept is too large. Last index in memtable: 
%d, lastLogIndexKept: %d",
-                    logIndexBase + segmentFileOffsets.size() - 1, 
lastLogIndexKept
+                    lastLogIndexExclusive - 1, lastLogIndexKept
             ));
         }
 
-        ArrayWithSize newSegmentFileOffsets = 
segmentFileOffsets.truncate((int) newSize);
+        int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1);
+
+        setSegmentFileOffsets(segmentFileOffsets, 
segmentFileOffsets.truncateSuffix(newSize));
+
+        // This could have been a "void" method, but this way it looks 
consistent with "truncatePrefix".
+        return this;

Review Comment:
   Why is this consistency important? Would it be better to make it void, but 
explain in a comment why the methods look differently?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java:
##########
@@ -40,12 +40,21 @@ class SegmentPayload {
     /**
      * Length of the byte sequence that is written when suffix truncation 
happens.
      *
-     * <p>Format: {@code groupId, 0 (special length value), last kept index, 
crc}
+     * <p>Format: {@code groupId, TRUNCATE_SUFFIX_RECORD_MARKER (special 
length value), last kept index, crc}
      */
     static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES + 
LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE_BYTES;

Review Comment:
   Sometimes it's CRC, sometimes it's 'hash'. Those are technically different 
terms. Would it make sense to stick with just one of them for consistency?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -168,29 +201,74 @@ int size() {
     void saveOffsetsTo(ByteBuffer buffer) {
         ArrayWithSize offsets = segmentFileOffsets;
 
+        assert offsets.size() > 0 : "Offsets array must not be empty";
+
         buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
     }
 
     /**
      * Removes all data which log indices are strictly greater than {@code 
lastLogIndexKept}.
      */
-    void truncateSuffix(long lastLogIndexKept) {
+    SegmentInfo truncateSuffix(long lastLogIndexKept) {
         assert lastLogIndexKept >= logIndexBase : 
String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, 
lastLogIndexKept);
 
         ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
 
-        long newSize = lastLogIndexKept - logIndexBase + 1;
+        long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size();
 
-        // Not using an assertion here, because this value comes doesn't come 
from the storage code.
-        if (newSize > segmentFileOffsets.size()) {
+        // Not using an assertion here, because this value doesn't come from 
the storage code.
+        if (lastLogIndexKept >= lastLogIndexExclusive) {
             throw new IllegalArgumentException(String.format(
                     "lastLogIndexKept is too large. Last index in memtable: 
%d, lastLogIndexKept: %d",
-                    logIndexBase + segmentFileOffsets.size() - 1, 
lastLogIndexKept
+                    lastLogIndexExclusive - 1, lastLogIndexKept
             ));
         }
 
-        ArrayWithSize newSegmentFileOffsets = 
segmentFileOffsets.truncate((int) newSize);
+        int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1);
+
+        setSegmentFileOffsets(segmentFileOffsets, 
segmentFileOffsets.truncateSuffix(newSize));
+
+        // This could have been a "void" method, but this way it looks 
consistent with "truncatePrefix".
+        return this;
+    }
+
+    /**
+     * Removes all data which log indices are strictly smaller than {@code 
firstIndexKept}.
+     */
+    SegmentInfo truncatePrefix(long firstIndexKept) {
+        if (isPrefixTombstone()) {
+            if (this.firstIndexKept >= firstIndexKept) {
+                throw new IllegalStateException(String.format(
+                        "Trying to truncate an already truncated prefix 
[curFirstIndexKept=%d, newFirstIndexKept=%d]",
+                        this.firstIndexKept, firstIndexKept
+                ));
+            }
+
+            return prefixTombstone(firstIndexKept);
+        }
+
+        ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
+
+        if (firstIndexKept < logIndexBase) {
+            return new SegmentInfo(logIndexBase, firstIndexKept, 
segmentFileOffsets);
+        }

Review Comment:
   What does this case mean?



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java:
##########
@@ -137,12 +137,12 @@ void testReadFromQueue() {
 
             for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) {
                 for (int logIndex = 0; logIndex < MAX_QUEUE_SIZE; logIndex++) {
-                    ByteBuffer payload = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
+                    EntrySearchResult searchResult = 
checkpointer.findSegmentPayloadInQueue(groupId, logIndex);
 
                     if (groupId == logIndex) {
-                        assertThat(payload, is(notNullValue()));
+                        assertThat(searchResult != null && 
!searchResult.isEmpty(), is(true));
                     } else {
-                        assertThat(payload, is(nullValue()));
+                        assertThat(searchResult == null || 
searchResult.isEmpty(), is(true));

Review Comment:
   Two assertions again



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java:
##########
@@ -195,22 +195,83 @@ void testOneWriterMultipleReadersWithOverlaps() {
                             expectedFileOrdinal
                     );
 
-                    var expectedMetaWithoutOverlap = new IndexFileMeta(
-                            expectedFirstLogIndex + overlap - 
logEntriesPerFile,
-                            expectedFirstLogIndex + overlap - 1,
-                            0,
-                            expectedFileOrdinal - 1
-                    );
-
-                    // We can possibly be reading from two different metas - 
from the newer one (that overlaps the older one) or
-                    // the older one.
-                    assertThat(
-                            logIndex + " -> " + indexFileMeta,
-                            indexFileMeta, 
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap)));
+                    if (expectedFirstLogIndex == 0) {
+                        assertThat(logIndex + " -> " + indexFileMeta, 
indexFileMeta, is(expectedMetaWithOverlap));
+                    } else {
+                        var expectedMetaWithoutOverlap = new IndexFileMeta(
+                                expectedFirstLogIndex + overlap - 
logEntriesPerFile,
+                                expectedFirstLogIndex + overlap - 1,
+                                0,
+                                expectedFileOrdinal - 1
+                        );
+
+                        // We can possibly be reading from two different metas 
- from the newer one (that overlaps the older one) or
+                        // the older one.
+                        assertThat(
+                                logIndex + " -> " + indexFileMeta,
+                                indexFileMeta, 
either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap))
+                        );
+                    }
                 }
             }
         };
 
         runRace(writer, reader, reader, reader);
     }
+
+    @Test
+    void testTruncatePrefix() {
+        var meta1 = new IndexFileMeta(1, 100, 0, 0);
+        var meta2 = new IndexFileMeta(42, 100, 42, 1);
+        var meta3 = new IndexFileMeta(100, 120, 66, 2);
+        var meta4 = new IndexFileMeta(110, 200, 95, 3);
+
+        var groupMeta = new GroupIndexMeta(meta1);
+
+        groupMeta.addIndexMeta(meta2);
+        groupMeta.addIndexMeta(meta3);
+        groupMeta.addIndexMeta(meta4);
+
+        assertThat(groupMeta.firstLogIndexInclusive(), is(1L));
+        assertThat(groupMeta.lastLogIndexExclusive(), is(200L));
+
+        assertThat(groupMeta.indexMeta(10), is(meta1));
+        assertThat(groupMeta.indexMeta(43), is(meta2));

Review Comment:
   ```suggestion
           assertThat(groupMeta.indexMeta(42), is(meta2));
   ```



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java:
##########
@@ -155,4 +155,64 @@ void testReadFromQueue() {
         // The queue should eventually become empty again.
         await().until(() -> checkpointer.findSegmentPayloadInQueue(0, 0), 
is(nullValue()));
     }
+
+    @Test
+    void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock 
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+        var blockFuture = new CompletableFuture<Void>();
+
+        try {
+            doAnswer(invocation -> blockFuture.join()).when(mockFile).sync();
+
+            ByteBuffer buffer = ByteBuffer.allocate(16);
+
+            when(mockFile.buffer()).thenReturn(buffer);
+
+            long groupId = 2;
+            long logIndex = 5;
+
+            SegmentInfo mockSegmentInfo = mock(SegmentInfo.class);

Review Comment:
   Can a real object be used instead of this mock?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -293,33 +301,76 @@ private byte[] 
serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl
 
             long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive();
 
+            long firstIndexKept = segmentInfo.firstIndexKept();
+
             // On recovery we are only creating missing index files, in-memory 
meta will be created on Index File Manager start.
             if (!onRecovery) {
-                var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, 
lastLogIndexExclusive, payloadOffset, fileOrdinal);
+                IndexFileMeta indexFileMeta = createIndexFileMeta(
+                        firstLogIndexInclusive, lastLogIndexExclusive, 
firstIndexKept, payloadOffset, fileOrdinal
+                );
 
-                putIndexFileMeta(groupId, indexFileMeta);
+                putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
             }
 
             headerBuffer
                     .putLong(groupId)
                     .putInt(0) // Flags.
                     .putInt(payloadOffset)
                     .putLong(firstLogIndexInclusive)
-                    .putLong(lastLogIndexExclusive);
+                    .putLong(lastLogIndexExclusive)
+                    .putLong(firstIndexKept);
 
             payloadOffset += payloadSize(segmentInfo);
         }
 
         return headerBuffer.array();
     }
 
-    private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+    private static @Nullable IndexFileMeta createIndexFileMeta(
+            long firstLogIndexInclusive,
+            long lastLogIndexExclusive,
+            long firstIndexKept,
+            int payloadOffset,
+            int fileOrdinal
+    ) {
+        if (firstLogIndexInclusive == -1) {
+            assert firstIndexKept != -1 : "Expected a prefix tombstone, but 
firstIndexKept is not set.";

Review Comment:
   For another `-1` it could be `NO_INDEX`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to