rpuch commented on code in PR #7101: URL: https://github.com/apache/ignite-3/pull/7101#discussion_r2589240695
########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import java.nio.ByteBuffer; +import org.jetbrains.annotations.Nullable; + +/** + * Class representing the result of an entry search in the log storage. + * + * <p>It is used to represent three conditions: + * + * <ol> + * <li>If the corresponding method returns {@code null}, then the entry was not found and we may want to continue Review Comment: How about returning a non-null marker for 'notOnThisLevel` instead of a null as well? It will make the code more readable. And the other option (`isEmpty`) could be renamed to something like `doesNotExist` ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java: ########## @@ -64,14 +65,16 @@ * +------------------------------------------------------------------+ * </pre> * - * <p>Raft group meta is as follows: - * <pre> - * +------------------------------------------------------------------------------------------------------------------------+-----+ - * | Raft group 1 meta | ... | - * +------------------------------------------------------------------------------------------------------------------------+-----+ - * | Group ID (8 bytes) | Flags (4 bytes) | Payload Offset (4 bytes) | First Log Index (8 bytes) | Last Log Index (8 bytes) | ... | - * +------------------------------------------------------------------------------------------------------------------------+-----+ - * </pre> + * <p>Each Raft group meta is as follows (written as a list, because the table doesn't fit the configured line length): + * <ol> + * <li>Group ID (8 bytes);</li> + * <li>Flags (4 bytes);</li> + * <li>Payload offset (4 bytes);</li> + * <li>First log index (8 bytes);</li> + * <li>Last log index (8 bytes);</li> Review Comment: Exclusive? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java: ########## @@ -117,9 +136,43 @@ IndexFileMeta indexMeta(long logIndex) { return null; } + /** + * Removes all index metas that have log indices smaller than the given value. + */ + void truncatePrefix(long firstLogIndexKept) { + Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator(); Review Comment: Please add a comment to `fileMetaDeque` to describe what its elements represent and in which order they are kept ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java: ########## @@ -104,22 +104,34 @@ void onRollover(SegmentFile segmentFile, ReadModeIndexMemTable indexMemTable) { /** * Searches for the segment payload corresponding to the given Raft Group ID and Raft Log Index in the checkpoint queue. - * - * @return {@code ByteBuffer} which position is set to the start of the corresponding segment payload or {@code null} if the payload has - * not been found in all files currently present in the queue. */ - @Nullable ByteBuffer findSegmentPayloadInQueue(long groupId, long logIndex) { + @Nullable EntrySearchResult findSegmentPayloadInQueue(long groupId, long logIndex) { Iterator<Entry> it = queue.tailIterator(); while (it.hasNext()) { Entry e = it.next(); SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId); - int segmentPayloadOffset = segmentInfo == null ? 0 : segmentInfo.getOffset(logIndex); + if (segmentInfo == null) { + continue; + } + + if (segmentInfo.lastLogIndexExclusive() <= logIndex) { + return EntrySearchResult.empty(); + } + + if (segmentInfo.firstIndexKept() > logIndex) { + // This is a prefix tombstone and it cuts off the log index we search for. + return EntrySearchResult.empty(); + } + + int segmentPayloadOffset = segmentInfo.getOffset(logIndex); if (segmentPayloadOffset != 0) { Review Comment: How about introducing a constant for 0, called `INDEX_NOT_IN_SEGMENT`? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java: ########## @@ -56,13 +58,23 @@ ArrayWithSize add(int element) { return new ArrayWithSize(array, size + 1); } - ArrayWithSize truncate(int newSize) { + ArrayWithSize truncateSuffix(int newSize) { + return truncate(0, newSize); + } + + ArrayWithSize truncatePrefix(int newSize) { + int srcPos = size - newSize; + + return truncate(srcPos, newSize); + } + + private ArrayWithSize truncate(int srcPos, int newSize) { assert newSize <= size : String.format("Array must shrink on truncation, current size: %d, size after truncation: %d", size, newSize); - int[] newArray = new int[size]; + int[] newArray = new int[array.length]; Review Comment: Do we allocate an array of the same size? Why? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java: ########## @@ -63,23 +69,40 @@ public void appendSegmentFileOffset(long groupId, long logIndex, int segmentFile @Override public SegmentInfo segmentInfo(long groupId) { - return stripe(groupId).memTable.get(groupId); + return memtable(groupId).get(groupId); } @Override public void truncateSuffix(long groupId, long lastLogIndexKept) { - ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable; - - SegmentInfo segmentInfo = memtable.get(groupId); + ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId); + + memtable.compute(groupId, (id, segmentInfo) -> { + if (segmentInfo == null || lastLogIndexKept < segmentInfo.firstLogIndexInclusive()) { + // If the current memtable does not have information for the given group or if we are truncating everything currently + // present in the memtable, we need to write a special "empty" SegmentInfo into the memtable to override existing persisted + // data during search. + return new SegmentInfo(lastLogIndexKept + 1); Review Comment: Why is 1 added here? ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java: ########## @@ -248,4 +249,107 @@ void testTruncateIntoThePast() { assertThat(memTable.segmentInfo(0).getOffset(12), is(0)); assertThat(memTable.segmentInfo(0).getOffset(36), is(0)); } + + @Test + void testTruncatePrefix() { + long groupId0 = 1; + long groupId1 = 2; Review Comment: ```suggestion long groupId1 = 1; long groupId2 = 2; ``` ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java: ########## @@ -106,4 +103,26 @@ public void firstAndLastLogIndexAfterSuffixTruncateAndRestart(int numEntries) th assertThat(logStorage.getFirstLogIndex(), is(0L)); assertThat(logStorage.getLastLogIndex(), is(lastIndexKept)); } + + @ParameterizedTest + @ValueSource(ints = { 15, 100_000 }) + public void firstAndLastLogIndexAfterPrefixTruncateAndRestart(int numEntries) throws Exception { + logStorage.appendEntries(TestUtils.mockEntries(numEntries)); + + long lastIndexKept = numEntries / 2; Review Comment: ```suggestion long firstIndexKept = numEntries / 2; ``` ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java: ########## @@ -137,12 +137,12 @@ void testReadFromQueue() { for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) { for (int logIndex = 0; logIndex < MAX_QUEUE_SIZE; logIndex++) { - ByteBuffer payload = checkpointer.findSegmentPayloadInQueue(groupId, logIndex); + EntrySearchResult searchResult = checkpointer.findSegmentPayloadInQueue(groupId, logIndex); if (groupId == logIndex) { - assertThat(payload, is(notNullValue())); + assertThat(searchResult != null && !searchResult.isEmpty(), is(true)); Review Comment: Let's split this into 2 assertions ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -533,13 +531,21 @@ private static boolean endOfSegmentReached(ByteBuffer buffer) { * possibly incomplete segment file. */ private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile segmentFile, Path segmentFilePath) { + return recoverMemtable(segmentFile, segmentFilePath, true); + } + + private WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, Path segmentFilePath) { + return recoverMemtable(segmentFile, segmentFilePath, false); Review Comment: CRC is not validated here as this is not the latest segment and we are sure it's fully synced, right? Please add a comment about this ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/EntrySearchResult.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import java.nio.ByteBuffer; +import org.jetbrains.annotations.Nullable; + +/** + * Class representing the result of an entry search in the log storage. + * + * <p>It is used to represent three conditions: + * + * <ol> + * <li>If the corresponding method returns {@code null}, then the entry was not found and we may want to continue + * looking in other places;</li> + * <li>If the method returns an {@link #isEmpty} instance, then the entry was not found but we know for sure that it does + * not exist in the storage;</li> + * <li>If the method returns a non-empty instance, then the corresponding entry has been found successfully.</li> + * </ol> + */ +class EntrySearchResult { + private static final EntrySearchResult EMPTY = new EntrySearchResult(); + + @Nullable + private final ByteBuffer entryBuffer; + + @SuppressWarnings("NullableProblems") // We don't want to accept nulls here. Review Comment: So what are we suppressing if we don't want to accept nulls? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java: ########## @@ -106,10 +114,50 @@ IndexFileMeta find(long logIndex) { } else if (logIndex >= midValue.lastLogIndexExclusive()) { lowArrayIndex = middleArrayIndex + 1; } else { - return midValue; + return middleArrayIndex; } } - return null; + return -1; + } + + IndexFileMetaArray truncateIndicesSmallerThan(long firstLogIndexKept) { + int firstLogIndexKeptArrayIndex = findArrayIndex(firstLogIndexKept); + + assert firstLogIndexKeptArrayIndex >= 0 : String.format( + "Missing entry for log index %d in range [%d:%d).", + firstLogIndexKept, firstLogIndexInclusive(), lastLogIndexExclusive() + ); + + IndexFileMeta metaToUpdate = array[firstLogIndexKeptArrayIndex]; + + int numEntriesToSkip = toIntExact(firstLogIndexKept - metaToUpdate.firstLogIndexInclusive()); + + assert numEntriesToSkip >= 0 : String.format( + "Trying to do a no-op prefix truncate from index %d in range [%d:%d).", + firstLogIndexKept, firstLogIndexInclusive(), lastLogIndexExclusive() + ); + + // Move the payload offset pointer to skip truncated entries (each entry is 4 bytes). + int adjustedPayloadOffset = metaToUpdate.indexFilePayloadOffset() + numEntriesToSkip * Integer.BYTES; + + var trimmedMeta = new IndexFileMeta( + firstLogIndexKept, + metaToUpdate.lastLogIndexExclusive(), + adjustedPayloadOffset, + metaToUpdate.indexFileOrdinal() + ); + + IndexFileMeta[] newArray = new IndexFileMeta[array.length]; + + newArray[0] = trimmedMeta; + + int newSize = size - firstLogIndexKeptArrayIndex; + + if (newSize > 1) { + System.arraycopy(array, firstLogIndexKeptArrayIndex + 1, newArray, 1, newSize - 1); Review Comment: ```suggestion if (newSize > 1) { // Copy elements following the one hosting firstLogIndexKept (which becomes the first one). System.arraycopy(array, firstLogIndexKeptArrayIndex + 1, newArray, 1, newSize - 1); ``` ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java: ########## @@ -293,33 +301,76 @@ private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive(); + long firstIndexKept = segmentInfo.firstIndexKept(); + // On recovery we are only creating missing index files, in-memory meta will be created on Index File Manager start. if (!onRecovery) { - var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileOrdinal); + IndexFileMeta indexFileMeta = createIndexFileMeta( + firstLogIndexInclusive, lastLogIndexExclusive, firstIndexKept, payloadOffset, fileOrdinal + ); - putIndexFileMeta(groupId, indexFileMeta); + putIndexFileMeta(groupId, indexFileMeta, firstIndexKept); } headerBuffer .putLong(groupId) .putInt(0) // Flags. .putInt(payloadOffset) .putLong(firstLogIndexInclusive) - .putLong(lastLogIndexExclusive); + .putLong(lastLogIndexExclusive) + .putLong(firstIndexKept); payloadOffset += payloadSize(segmentInfo); } return headerBuffer.array(); } - private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) { + private static @Nullable IndexFileMeta createIndexFileMeta( + long firstLogIndexInclusive, + long lastLogIndexExclusive, + long firstIndexKept, + int payloadOffset, + int fileOrdinal + ) { + if (firstLogIndexInclusive == -1) { + assert firstIndexKept != -1 : "Expected a prefix tombstone, but firstIndexKept is not set."; Review Comment: Let's create a constant called `PREFIX_TOMBSTONE` ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java: ########## @@ -293,33 +301,76 @@ private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive(); + long firstIndexKept = segmentInfo.firstIndexKept(); + // On recovery we are only creating missing index files, in-memory meta will be created on Index File Manager start. if (!onRecovery) { - var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileOrdinal); + IndexFileMeta indexFileMeta = createIndexFileMeta( + firstLogIndexInclusive, lastLogIndexExclusive, firstIndexKept, payloadOffset, fileOrdinal + ); - putIndexFileMeta(groupId, indexFileMeta); + putIndexFileMeta(groupId, indexFileMeta, firstIndexKept); } headerBuffer .putLong(groupId) .putInt(0) // Flags. .putInt(payloadOffset) .putLong(firstLogIndexInclusive) - .putLong(lastLogIndexExclusive); + .putLong(lastLogIndexExclusive) + .putLong(firstIndexKept); payloadOffset += payloadSize(segmentInfo); } return headerBuffer.array(); } - private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) { + private static @Nullable IndexFileMeta createIndexFileMeta( + long firstLogIndexInclusive, + long lastLogIndexExclusive, + long firstIndexKept, + int payloadOffset, + int fileOrdinal + ) { + if (firstLogIndexInclusive == -1) { + assert firstIndexKept != -1 : "Expected a prefix tombstone, but firstIndexKept is not set."; + + // This is a "prefix tombstone", no need to create any meta, we will just truncate the prefix. + return null; + } + + if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) { + // No prefix truncation required, simply create a new meta. + return new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileOrdinal); + } + + // Create a meta with a truncated prefix. + int numEntriesToSkip = toIntExact(firstIndexKept - firstLogIndexInclusive); + + int adjustedPayloadOffset = payloadOffset + numEntriesToSkip * Integer.BYTES; + + return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive, adjustedPayloadOffset, fileOrdinal); + } + + private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta indexFileMeta, long firstIndexKept) { GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId); if (existingGroupIndexMeta == null) { - groupIndexMetas.put(groupId, new GroupIndexMeta(indexFileMeta)); + if (indexFileMeta != null) { + groupIndexMetas.put(groupId, new GroupIndexMeta(indexFileMeta)); + } } else { - existingGroupIndexMeta.addIndexMeta(indexFileMeta); + if (firstIndexKept != -1) { + existingGroupIndexMeta.truncatePrefix(firstIndexKept); + } + + if (indexFileMeta != null) { + // New index meta must have already been truncated according to the prefix tombstone. Review Comment: Why is it 'according to...'? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java: ########## @@ -106,10 +114,50 @@ IndexFileMeta find(long logIndex) { } else if (logIndex >= midValue.lastLogIndexExclusive()) { lowArrayIndex = middleArrayIndex + 1; } else { - return midValue; + return middleArrayIndex; } } - return null; + return -1; + } + + IndexFileMetaArray truncateIndicesSmallerThan(long firstLogIndexKept) { + int firstLogIndexKeptArrayIndex = findArrayIndex(firstLogIndexKept); + + assert firstLogIndexKeptArrayIndex >= 0 : String.format( + "Missing entry for log index %d in range [%d:%d).", + firstLogIndexKept, firstLogIndexInclusive(), lastLogIndexExclusive() + ); + + IndexFileMeta metaToUpdate = array[firstLogIndexKeptArrayIndex]; + + int numEntriesToSkip = toIntExact(firstLogIndexKept - metaToUpdate.firstLogIndexInclusive()); + + assert numEntriesToSkip >= 0 : String.format( + "Trying to do a no-op prefix truncate from index %d in range [%d:%d).", + firstLogIndexKept, firstLogIndexInclusive(), lastLogIndexExclusive() + ); + + // Move the payload offset pointer to skip truncated entries (each entry is 4 bytes). + int adjustedPayloadOffset = metaToUpdate.indexFilePayloadOffset() + numEntriesToSkip * Integer.BYTES; Review Comment: Are those 4 bytes the same 4 bytes used in `IndexFileManager`? Anyway, let's use a constant with a descriptive name ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java: ########## @@ -104,22 +104,34 @@ void onRollover(SegmentFile segmentFile, ReadModeIndexMemTable indexMemTable) { /** * Searches for the segment payload corresponding to the given Raft Group ID and Raft Log Index in the checkpoint queue. - * - * @return {@code ByteBuffer} which position is set to the start of the corresponding segment payload or {@code null} if the payload has - * not been found in all files currently present in the queue. */ - @Nullable ByteBuffer findSegmentPayloadInQueue(long groupId, long logIndex) { + @Nullable EntrySearchResult findSegmentPayloadInQueue(long groupId, long logIndex) { Iterator<Entry> it = queue.tailIterator(); while (it.hasNext()) { Entry e = it.next(); SegmentInfo segmentInfo = e.memTable().segmentInfo(groupId); - int segmentPayloadOffset = segmentInfo == null ? 0 : segmentInfo.getOffset(logIndex); + if (segmentInfo == null) { + continue; + } + + if (segmentInfo.lastLogIndexExclusive() <= logIndex) { Review Comment: Would it make sense to flip this and the following comparisons, butting `logIndex` on the left? It seems more natural to put 'the thing that we are checking' on the left and the boundary on the right ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java: ########## @@ -293,33 +301,76 @@ private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive(); + long firstIndexKept = segmentInfo.firstIndexKept(); + // On recovery we are only creating missing index files, in-memory meta will be created on Index File Manager start. if (!onRecovery) { - var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileOrdinal); + IndexFileMeta indexFileMeta = createIndexFileMeta( + firstLogIndexInclusive, lastLogIndexExclusive, firstIndexKept, payloadOffset, fileOrdinal + ); - putIndexFileMeta(groupId, indexFileMeta); + putIndexFileMeta(groupId, indexFileMeta, firstIndexKept); } headerBuffer .putLong(groupId) .putInt(0) // Flags. .putInt(payloadOffset) .putLong(firstLogIndexInclusive) - .putLong(lastLogIndexExclusive); + .putLong(lastLogIndexExclusive) + .putLong(firstIndexKept); payloadOffset += payloadSize(segmentInfo); } return headerBuffer.array(); } - private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) { + private static @Nullable IndexFileMeta createIndexFileMeta( + long firstLogIndexInclusive, + long lastLogIndexExclusive, + long firstIndexKept, + int payloadOffset, + int fileOrdinal + ) { + if (firstLogIndexInclusive == -1) { + assert firstIndexKept != -1 : "Expected a prefix tombstone, but firstIndexKept is not set."; + + // This is a "prefix tombstone", no need to create any meta, we will just truncate the prefix. + return null; + } + + if (firstIndexKept == -1 || firstIndexKept <= firstLogIndexInclusive) { + // No prefix truncation required, simply create a new meta. + return new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileOrdinal); + } + + // Create a meta with a truncated prefix. + int numEntriesToSkip = toIntExact(firstIndexKept - firstLogIndexInclusive); + + int adjustedPayloadOffset = payloadOffset + numEntriesToSkip * Integer.BYTES; Review Comment: Let's create a constant for Integer.BYTES here ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -556,11 +562,24 @@ private WriteModeIndexMemTable recoverLatestMemtable(SegmentFile segmentFile, Pa buffer.position(segmentFilePayloadOffset); // CRC violation signals the end of meaningful data in the segment file. - if (!isCrcValid(buffer, crcPosition)) { + if (validateCrc && !isCrcValid(buffer, crcPosition)) { Review Comment: This block is duplicated for each of 3 entry types. How about introducing the entry type explicitly in code? This would help to have the recovery code nicely packaged in 3 classes; this will probably allow to get rid of this duplication and also make the object model more obvious ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -337,17 +364,22 @@ private WriteBufferWithMemtable reserveBytesWithRollover(int size) throws IOExce * storage, not taking pending in-memory state into account. */ long firstLogIndexInclusiveOnRecovery(long groupId) { + SegmentFileWithMemtable currentSegmentFile = this.currentSegmentFile.get(); + + SegmentInfo segmentInfo = currentSegmentFile.memtable().segmentInfo(groupId); + + // We need to consult with the latest memtable in case it contains a prefix tombstone. + if (segmentInfo != null && segmentInfo.firstIndexKept() != -1) { + return segmentInfo.firstIndexKept(); + } + long firstLogIndexFromIndexStorage = indexFileManager.firstLogIndexInclusive(groupId); Review Comment: Why do we look at the index if we have the latest memtable? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java: ########## @@ -168,29 +201,74 @@ int size() { void saveOffsetsTo(ByteBuffer buffer) { ArrayWithSize offsets = segmentFileOffsets; + assert offsets.size() > 0 : "Offsets array must not be empty"; + buffer.asIntBuffer().put(offsets.array, 0, offsets.size); } /** * Removes all data which log indices are strictly greater than {@code lastLogIndexKept}. */ - void truncateSuffix(long lastLogIndexKept) { + SegmentInfo truncateSuffix(long lastLogIndexKept) { assert lastLogIndexKept >= logIndexBase : String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, lastLogIndexKept); ArrayWithSize segmentFileOffsets = this.segmentFileOffsets; - long newSize = lastLogIndexKept - logIndexBase + 1; + long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size(); - // Not using an assertion here, because this value comes doesn't come from the storage code. - if (newSize > segmentFileOffsets.size()) { + // Not using an assertion here, because this value doesn't come from the storage code. + if (lastLogIndexKept >= lastLogIndexExclusive) { throw new IllegalArgumentException(String.format( "lastLogIndexKept is too large. Last index in memtable: %d, lastLogIndexKept: %d", - logIndexBase + segmentFileOffsets.size() - 1, lastLogIndexKept + lastLogIndexExclusive - 1, lastLogIndexKept )); } - ArrayWithSize newSegmentFileOffsets = segmentFileOffsets.truncate((int) newSize); + int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1); + + setSegmentFileOffsets(segmentFileOffsets, segmentFileOffsets.truncateSuffix(newSize)); + + // This could have been a "void" method, but this way it looks consistent with "truncatePrefix". + return this; + } + + /** + * Removes all data which log indices are strictly smaller than {@code firstIndexKept}. + */ + SegmentInfo truncatePrefix(long firstIndexKept) { + if (isPrefixTombstone()) { + if (this.firstIndexKept >= firstIndexKept) { Review Comment: Let's flip this comparison ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java: ########## @@ -168,29 +201,74 @@ int size() { void saveOffsetsTo(ByteBuffer buffer) { ArrayWithSize offsets = segmentFileOffsets; + assert offsets.size() > 0 : "Offsets array must not be empty"; + buffer.asIntBuffer().put(offsets.array, 0, offsets.size); } /** * Removes all data which log indices are strictly greater than {@code lastLogIndexKept}. */ - void truncateSuffix(long lastLogIndexKept) { + SegmentInfo truncateSuffix(long lastLogIndexKept) { assert lastLogIndexKept >= logIndexBase : String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, lastLogIndexKept); ArrayWithSize segmentFileOffsets = this.segmentFileOffsets; - long newSize = lastLogIndexKept - logIndexBase + 1; + long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size(); - // Not using an assertion here, because this value comes doesn't come from the storage code. - if (newSize > segmentFileOffsets.size()) { + // Not using an assertion here, because this value doesn't come from the storage code. + if (lastLogIndexKept >= lastLogIndexExclusive) { throw new IllegalArgumentException(String.format( "lastLogIndexKept is too large. Last index in memtable: %d, lastLogIndexKept: %d", - logIndexBase + segmentFileOffsets.size() - 1, lastLogIndexKept + lastLogIndexExclusive - 1, lastLogIndexKept )); } - ArrayWithSize newSegmentFileOffsets = segmentFileOffsets.truncate((int) newSize); + int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1); + + setSegmentFileOffsets(segmentFileOffsets, segmentFileOffsets.truncateSuffix(newSize)); + + // This could have been a "void" method, but this way it looks consistent with "truncatePrefix". + return this; Review Comment: Why is this consistency important? Would it be better to make it void, but explain in a comment why the methods look differently? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java: ########## @@ -40,12 +40,21 @@ class SegmentPayload { /** * Length of the byte sequence that is written when suffix truncation happens. * - * <p>Format: {@code groupId, 0 (special length value), last kept index, crc} + * <p>Format: {@code groupId, TRUNCATE_SUFFIX_RECORD_MARKER (special length value), last kept index, crc} */ static final int TRUNCATE_SUFFIX_RECORD_SIZE = GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + Long.BYTES + HASH_SIZE_BYTES; Review Comment: Sometimes it's CRC, sometimes it's 'hash'. Those are technically different terms. Would it make sense to stick with just one of them for consistency? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java: ########## @@ -168,29 +201,74 @@ int size() { void saveOffsetsTo(ByteBuffer buffer) { ArrayWithSize offsets = segmentFileOffsets; + assert offsets.size() > 0 : "Offsets array must not be empty"; + buffer.asIntBuffer().put(offsets.array, 0, offsets.size); } /** * Removes all data which log indices are strictly greater than {@code lastLogIndexKept}. */ - void truncateSuffix(long lastLogIndexKept) { + SegmentInfo truncateSuffix(long lastLogIndexKept) { assert lastLogIndexKept >= logIndexBase : String.format("logIndexBase=%d, lastLogIndexKept=%d", logIndexBase, lastLogIndexKept); ArrayWithSize segmentFileOffsets = this.segmentFileOffsets; - long newSize = lastLogIndexKept - logIndexBase + 1; + long lastLogIndexExclusive = logIndexBase + segmentFileOffsets.size(); - // Not using an assertion here, because this value comes doesn't come from the storage code. - if (newSize > segmentFileOffsets.size()) { + // Not using an assertion here, because this value doesn't come from the storage code. + if (lastLogIndexKept >= lastLogIndexExclusive) { throw new IllegalArgumentException(String.format( "lastLogIndexKept is too large. Last index in memtable: %d, lastLogIndexKept: %d", - logIndexBase + segmentFileOffsets.size() - 1, lastLogIndexKept + lastLogIndexExclusive - 1, lastLogIndexKept )); } - ArrayWithSize newSegmentFileOffsets = segmentFileOffsets.truncate((int) newSize); + int newSize = toIntExact(lastLogIndexKept - logIndexBase + 1); + + setSegmentFileOffsets(segmentFileOffsets, segmentFileOffsets.truncateSuffix(newSize)); + + // This could have been a "void" method, but this way it looks consistent with "truncatePrefix". + return this; + } + + /** + * Removes all data which log indices are strictly smaller than {@code firstIndexKept}. + */ + SegmentInfo truncatePrefix(long firstIndexKept) { + if (isPrefixTombstone()) { + if (this.firstIndexKept >= firstIndexKept) { + throw new IllegalStateException(String.format( + "Trying to truncate an already truncated prefix [curFirstIndexKept=%d, newFirstIndexKept=%d]", + this.firstIndexKept, firstIndexKept + )); + } + + return prefixTombstone(firstIndexKept); + } + + ArrayWithSize segmentFileOffsets = this.segmentFileOffsets; + + if (firstIndexKept < logIndexBase) { + return new SegmentInfo(logIndexBase, firstIndexKept, segmentFileOffsets); + } Review Comment: What does this case mean? ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java: ########## @@ -137,12 +137,12 @@ void testReadFromQueue() { for (int groupId = 0; groupId < MAX_QUEUE_SIZE; groupId++) { for (int logIndex = 0; logIndex < MAX_QUEUE_SIZE; logIndex++) { - ByteBuffer payload = checkpointer.findSegmentPayloadInQueue(groupId, logIndex); + EntrySearchResult searchResult = checkpointer.findSegmentPayloadInQueue(groupId, logIndex); if (groupId == logIndex) { - assertThat(payload, is(notNullValue())); + assertThat(searchResult != null && !searchResult.isEmpty(), is(true)); } else { - assertThat(payload, is(nullValue())); + assertThat(searchResult == null || searchResult.isEmpty(), is(true)); Review Comment: Two assertions again ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java: ########## @@ -195,22 +195,83 @@ void testOneWriterMultipleReadersWithOverlaps() { expectedFileOrdinal ); - var expectedMetaWithoutOverlap = new IndexFileMeta( - expectedFirstLogIndex + overlap - logEntriesPerFile, - expectedFirstLogIndex + overlap - 1, - 0, - expectedFileOrdinal - 1 - ); - - // We can possibly be reading from two different metas - from the newer one (that overlaps the older one) or - // the older one. - assertThat( - logIndex + " -> " + indexFileMeta, - indexFileMeta, either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap))); + if (expectedFirstLogIndex == 0) { + assertThat(logIndex + " -> " + indexFileMeta, indexFileMeta, is(expectedMetaWithOverlap)); + } else { + var expectedMetaWithoutOverlap = new IndexFileMeta( + expectedFirstLogIndex + overlap - logEntriesPerFile, + expectedFirstLogIndex + overlap - 1, + 0, + expectedFileOrdinal - 1 + ); + + // We can possibly be reading from two different metas - from the newer one (that overlaps the older one) or + // the older one. + assertThat( + logIndex + " -> " + indexFileMeta, + indexFileMeta, either(is(expectedMetaWithOverlap)).or(is(expectedMetaWithoutOverlap)) + ); + } } } }; runRace(writer, reader, reader, reader); } + + @Test + void testTruncatePrefix() { + var meta1 = new IndexFileMeta(1, 100, 0, 0); + var meta2 = new IndexFileMeta(42, 100, 42, 1); + var meta3 = new IndexFileMeta(100, 120, 66, 2); + var meta4 = new IndexFileMeta(110, 200, 95, 3); + + var groupMeta = new GroupIndexMeta(meta1); + + groupMeta.addIndexMeta(meta2); + groupMeta.addIndexMeta(meta3); + groupMeta.addIndexMeta(meta4); + + assertThat(groupMeta.firstLogIndexInclusive(), is(1L)); + assertThat(groupMeta.lastLogIndexExclusive(), is(200L)); + + assertThat(groupMeta.indexMeta(10), is(meta1)); + assertThat(groupMeta.indexMeta(43), is(meta2)); Review Comment: ```suggestion assertThat(groupMeta.indexMeta(42), is(meta2)); ``` ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java: ########## @@ -155,4 +155,64 @@ void testReadFromQueue() { // The queue should eventually become empty again. await().until(() -> checkpointer.findSegmentPayloadInQueue(0, 0), is(nullValue())); } + + @Test + void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock SegmentFile mockFile, @Mock IndexMemTable mockMemTable) { + var blockFuture = new CompletableFuture<Void>(); + + try { + doAnswer(invocation -> blockFuture.join()).when(mockFile).sync(); + + ByteBuffer buffer = ByteBuffer.allocate(16); + + when(mockFile.buffer()).thenReturn(buffer); + + long groupId = 2; + long logIndex = 5; + + SegmentInfo mockSegmentInfo = mock(SegmentInfo.class); Review Comment: Can a real object be used instead of this mock? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java: ########## @@ -293,33 +301,76 @@ private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl long lastLogIndexExclusive = segmentInfo.lastLogIndexExclusive(); + long firstIndexKept = segmentInfo.firstIndexKept(); + // On recovery we are only creating missing index files, in-memory meta will be created on Index File Manager start. if (!onRecovery) { - var indexFileMeta = new IndexFileMeta(firstLogIndexInclusive, lastLogIndexExclusive, payloadOffset, fileOrdinal); + IndexFileMeta indexFileMeta = createIndexFileMeta( + firstLogIndexInclusive, lastLogIndexExclusive, firstIndexKept, payloadOffset, fileOrdinal + ); - putIndexFileMeta(groupId, indexFileMeta); + putIndexFileMeta(groupId, indexFileMeta, firstIndexKept); } headerBuffer .putLong(groupId) .putInt(0) // Flags. .putInt(payloadOffset) .putLong(firstLogIndexInclusive) - .putLong(lastLogIndexExclusive); + .putLong(lastLogIndexExclusive) + .putLong(firstIndexKept); payloadOffset += payloadSize(segmentInfo); } return headerBuffer.array(); } - private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) { + private static @Nullable IndexFileMeta createIndexFileMeta( + long firstLogIndexInclusive, + long lastLogIndexExclusive, + long firstIndexKept, + int payloadOffset, + int fileOrdinal + ) { + if (firstLogIndexInclusive == -1) { + assert firstIndexKept != -1 : "Expected a prefix tombstone, but firstIndexKept is not set."; Review Comment: For another `-1` it could be `NO_INDEX` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
