This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b0ac00a3960 IGNITE-27964 Use index-based approach in
RaftLogGarbageCollector (#7748)
b0ac00a3960 is described below
commit b0ac00a3960b969b376a1aaf086108f1a111096d
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Mar 12 16:39:52 2026 +0200
IGNITE-27964 Use index-based approach in RaftLogGarbageCollector (#7748)
---
.../raft/storage/segstore/AbstractMemTable.java | 12 +-
.../raft/storage/segstore/GroupIndexMeta.java | 24 ++-
.../raft/storage/segstore/GroupInfoProvider.java | 50 ------
.../raft/storage/segstore/IndexFileManager.java | 60 ++++++-
.../raft/storage/segstore/IndexFileMetaArray.java | 30 +++-
.../storage/segstore/RaftLogGarbageCollector.java | 179 ++++++++++---------
.../raft/storage/segstore/SegmentFileManager.java | 3 +-
.../storage/segstore/AbstractMemTableTest.java | 23 +++
.../raft/storage/segstore/GroupIndexMetaTest.java | 190 +++++++++++++++-----
.../segstore/RaftLogGarbageCollectorTest.java | 196 ++++++++++++---------
.../segstore/SegmentFileManagerGetEntryTest.java | 1 -
.../storage/segstore/SegmentFileManagerTest.java | 1 -
.../SegstoreLogStorageConcurrencyTest.java | 1 -
.../storage/segstore/SegstoreLogStorageTest.java | 1 -
14 files changed, 490 insertions(+), 281 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
index ad66f091c1b..1f5551b0864 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
@@ -56,13 +56,13 @@ abstract class AbstractMemTable implements
WriteModeIndexMemTable, ReadModeIndex
Map<Long, SegmentInfo> memtable = memtable(groupId);
memtable.compute(groupId, (id, segmentInfo) -> {
- if (segmentInfo == null || lastLogIndexKept <
segmentInfo.firstLogIndexInclusive()) {
- // If the current memtable does not have information for the
given group or if we are truncating everything currently
- // present in the memtable, we need to write a special "empty"
SegmentInfo into the memtable to override existing persisted
- // data during search.
+ if (segmentInfo == null) {
+ // If the current memtable does not have information for the
given group, we need to write a special "empty" SegmentInfo
+ // into the memtable to override existing persisted data
during search.
return new SegmentInfo(lastLogIndexKept + 1);
- } else if (segmentInfo.isPrefixTombstone()) {
- // This is a prefix tombstone inserted by "truncatePrefix".
+ } else if (segmentInfo.isPrefixTombstone() || lastLogIndexKept <
segmentInfo.firstLogIndexInclusive()) {
+ // This is either a prefix tombstone inserted by
"truncatePrefix" or we are removing all data from this segment file
+ // about this group.
return new SegmentInfo(lastLogIndexKept + 1,
segmentInfo.firstIndexKept());
} else {
return segmentInfo.truncateSuffix(lastLogIndexKept);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index c7fce3222a8..ae386a63224 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -123,7 +123,14 @@ class GroupIndexMeta {
}
void addIndexMeta(IndexFileMeta indexFileMeta) {
- IndexMetaArrayHolder curFileMetas = fileMetaDeque.getLast();
+ IndexMetaArrayHolder curFileMetas = fileMetaDeque.peekLast();
+
+ // Deque may be empty due to prefix truncation.
+ if (curFileMetas == null) {
+ fileMetaDeque.add(new IndexMetaArrayHolder(indexFileMeta));
+
+ return;
+ }
long curLastLogIndex = curFileMetas.lastLogIndexExclusive();
@@ -159,7 +166,7 @@ class GroupIndexMeta {
* is not found in any of the index files in this group.
*/
@Nullable
- IndexFileMeta indexMeta(long logIndex) {
+ IndexFileMeta indexMetaByLogIndex(long logIndex) {
Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator();
while (it.hasNext()) {
@@ -184,6 +191,19 @@ class GroupIndexMeta {
return null;
}
+ @Nullable
+ IndexFileMeta indexMetaByFileOrdinal(int fileOrdinal) {
+ for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
+ IndexFileMeta indexMeta =
indexMetaArrayHolder.fileMetas.findByFileOrdinal(fileOrdinal);
+
+ if (indexMeta != null) {
+ return indexMeta;
+ }
+ }
+
+ return null;
+ }
+
/**
* Removes all index metas that have log indices smaller than the given
value.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
deleted file mode 100644
index 186a06d31fa..00000000000
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.raft.storage.segstore;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Provides information about a Raft group present in the storage.
- */
-@FunctionalInterface
-interface GroupInfoProvider {
- GroupInfoProvider NO_OP = groupId -> null;
-
- class GroupInfo {
- private final long firstLogIndexInclusive;
-
- private final long lastLogIndexExclusive;
-
- GroupInfo(long firstLogIndexInclusive, long lastLogIndexExclusive) {
- this.firstLogIndexInclusive = firstLogIndexInclusive;
- this.lastLogIndexExclusive = lastLogIndexExclusive;
- }
-
- long firstLogIndexInclusive() {
- return firstLogIndexInclusive;
- }
-
- long lastLogIndexExclusive() {
- return lastLogIndexExclusive;
- }
- }
-
- @Nullable
- GroupInfo groupInfo(long groupId);
-}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 7ebd05cc953..ab7d8d0a2e6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -23,6 +23,8 @@ import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
import java.io.BufferedInputStream;
@@ -277,7 +279,7 @@ class IndexFileManager {
return null;
}
- IndexFileMeta indexFileMeta = groupIndexMeta.indexMeta(logIndex);
+ IndexFileMeta indexFileMeta =
groupIndexMeta.indexMetaByLogIndex(logIndex);
if (indexFileMeta == null) {
return null;
@@ -340,6 +342,40 @@ class IndexFileManager {
return indexFilesDir.resolve(indexFileName(fileProperties));
}
+ /**
+ * Returns information about a segment file (identified by its ordinal) as
a [groupId -> descriptor] mapping.
+ *
+ * <p>If all entries from the file for a given group have been
<i>logically</i> removed, for example, as a result of later prefix
+ * truncation, then the mapping will not contain an entry for this group.
Otherwise, it will contain the smallest and largest log
+ * indices across all index files for this group.
+ */
+ Long2ObjectMap<GroupDescriptor> describeSegmentFile(int fileOrdinal) {
+ var result = new
Long2ObjectOpenHashMap<GroupDescriptor>(groupIndexMetas.size());
+
+ groupIndexMetas.forEach((groupId, groupIndexMeta) -> {
+ IndexFileMeta indexFileMeta =
groupIndexMeta.indexMetaByFileOrdinal(fileOrdinal);
+
+ if (indexFileMeta != null) {
+ // Even if index meta exists, it may still be obsolete due to
suffix truncations (during suffix truncations we do not trim
+ // the meta as during prefix truncations, but add a new meta
block).
+ long firstLogIndexInclusive =
groupIndexMeta.firstLogIndexInclusive();
+
+ long lastLogIndexExclusive =
groupIndexMeta.lastLogIndexExclusive();
+
+ boolean isObsolete = indexFileMeta.firstLogIndexInclusive() >=
lastLogIndexExclusive
+ || indexFileMeta.lastLogIndexExclusive() <=
firstLogIndexInclusive;
+
+ if (!isObsolete) {
+ var groupDescriptor = new
GroupDescriptor(firstLogIndexInclusive, lastLogIndexExclusive);
+
+ result.put((long) groupId, groupDescriptor);
+ }
+ }
+ });
+
+ return result;
+ }
+
private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
ReadModeIndexMemTable indexMemTable,
FileProperties fileProperties
@@ -623,4 +659,26 @@ class IndexFileManager {
return firstIndexKept;
}
}
+
+ /** Class that provides information about a Raft group. */
+ static class GroupDescriptor {
+ /** First log index for the group across all files (inclusive). */
+ private final long firstLogIndexInclusive;
+
+ /** Last log index for the group across all files (exclusive). */
+ private final long lastLogIndexExclusive;
+
+ private GroupDescriptor(long firstLogIndexInclusive, long
lastLogIndexExclusive) {
+ this.firstLogIndexInclusive = firstLogIndexInclusive;
+ this.lastLogIndexExclusive = lastLogIndexExclusive;
+ }
+
+ long firstLogIndexInclusive() {
+ return firstLogIndexInclusive;
+ }
+
+ long lastLogIndexExclusive() {
+ return lastLogIndexExclusive;
+ }
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
index 5801bee7a16..44484b5e938 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -29,6 +29,8 @@ import org.jetbrains.annotations.Nullable;
* <p>Reads from multiple threads are thread-safe, but writes are expected to
be done from a single thread only.
*/
class IndexFileMetaArray {
+ private static final int MISSING_ARRAY_INDEX = -1;
+
static final int INITIAL_CAPACITY = 10;
private final IndexFileMeta[] array;
@@ -165,14 +167,9 @@ class IndexFileMetaArray {
}
IndexFileMetaArray onIndexCompacted(FileProperties oldProperties,
IndexFileMeta newMeta) {
- // Find index meta associated with the file being compacted.
- int smallestOrdinal = array[0].indexFileProperties().ordinal();
-
- assert oldProperties.ordinal() >= smallestOrdinal;
-
- int updateIndex = oldProperties.ordinal() - smallestOrdinal;
+ int updateIndex = arrayIndexByFileOrdinal(oldProperties.ordinal());
- if (updateIndex >= size) {
+ if (updateIndex == MISSING_ARRAY_INDEX) {
return this;
}
@@ -208,6 +205,25 @@ class IndexFileMetaArray {
return new IndexFileMetaArray(newArray, size);
}
+ @Nullable
+ IndexFileMeta findByFileOrdinal(int fileOrdinal) {
+ int arrayIndex = arrayIndexByFileOrdinal(fileOrdinal);
+
+ return arrayIndex == MISSING_ARRAY_INDEX ? null : array[arrayIndex];
+ }
+
+ private int arrayIndexByFileOrdinal(int fileOrdinal) {
+ int smallestOrdinal = array[0].indexFileProperties().ordinal();
+
+ if (fileOrdinal < smallestOrdinal) {
+ return MISSING_ARRAY_INDEX;
+ }
+
+ int arrayIndex = fileOrdinal - smallestOrdinal;
+
+ return arrayIndex >= size ? MISSING_ARRAY_INDEX : arrayIndex;
+ }
+
@Override
public String toString() {
return Arrays.toString(array);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
index f4fc31cf654..9402a0ae0f1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
@@ -23,11 +23,14 @@ import static java.nio.file.StandardOpenOption.WRITE;
import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.RESET_RECORD_MARKER;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_MARKER;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_SUFFIX_RECORD_MARKER;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -37,10 +40,9 @@ import java.nio.file.Path;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
-import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.GroupDescriptor;
import org.apache.ignite.internal.raft.util.VarlenEncoder;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.jetbrains.annotations.VisibleForTesting;
@@ -49,8 +51,8 @@ import org.jetbrains.annotations.VisibleForTesting;
* Garbage Collector for Raft log segment files.
*
* <p>The garbage collector performs compaction of segment files by removing
truncated log entries and creating new generations
- * of segment files. This process reclaims disk space occupied by log entries
that have been truncated via
- * {@link LogStorage#truncatePrefix} or {@link LogStorage#truncateSuffix}
operations.
+ * of segment files. This process reclaims disk space occupied by log entries
that have been truncated via {@link LogStorage#truncatePrefix}
+ * or {@link LogStorage#truncateSuffix} operations.
*
* <h2>Compaction Process</h2>
* When a segment file is selected for compaction, the GC:
@@ -70,18 +72,11 @@ class RaftLogGarbageCollector {
private final IndexFileManager indexFileManager;
- private final GroupInfoProvider groupInfoProvider;
+ private final AtomicLong logSizeBytes = new AtomicLong();
- private final AtomicLong logSize = new AtomicLong();
-
- RaftLogGarbageCollector(
- Path segmentFilesDir,
- IndexFileManager indexFileManager,
- GroupInfoProvider groupInfoProvider
- ) {
+ RaftLogGarbageCollector(Path segmentFilesDir, IndexFileManager
indexFileManager) {
this.segmentFilesDir = segmentFilesDir;
this.indexFileManager = indexFileManager;
- this.groupInfoProvider = groupInfoProvider;
}
void cleanupLeftoverFiles() throws IOException {
@@ -142,23 +137,61 @@ class RaftLogGarbageCollector {
}
}
- // TODO: Optimize compaction of completely truncated files, see
https://issues.apache.org/jira/browse/IGNITE-27964.
@VisibleForTesting
- void compactSegmentFile(SegmentFile segmentFile) throws IOException {
+ void runCompaction(SegmentFile segmentFile) throws IOException {
LOG.info("Compacting segment file [path = {}].", segmentFile.path());
- // Cache for avoiding excessive min/max log index computations.
- var logStorageInfos = new Long2ObjectOpenHashMap<GroupInfo>();
+ Long2ObjectMap<GroupDescriptor> segmentFileDescription
+ =
indexFileManager.describeSegmentFile(segmentFile.fileProperties().ordinal());
+
+ boolean canRemoveSegmentFile = segmentFileDescription.isEmpty();
+
+ Path indexFilePath =
indexFileManager.indexFilePath(segmentFile.fileProperties());
+
+ long logSizeDelta;
+
+ if (canRemoveSegmentFile) {
+ logSizeDelta = Files.size(segmentFile.path()) +
Files.size(indexFilePath);
+ } else {
+ logSizeDelta = compactSegmentFile(segmentFile, indexFilePath,
segmentFileDescription);
+ }
+ // Remove the previous generation of the segment file and its index.
This is safe to do, because we rely on the file system
+ // guarantees that other threads reading from the segment file will
still be able to do that even if the file is deleted.
+ Files.delete(segmentFile.path());
+ Files.delete(indexFilePath);
+
+ long newLogSize = logSizeBytes.addAndGet(-logSizeDelta);
+
+ if (LOG.isInfoEnabled()) {
+ if (canRemoveSegmentFile) {
+ LOG.info(
+ "Segment file removed (all entries are truncated)
[path = {}, log size freed = {} bytes, new log size = {} bytes].",
+ segmentFile.path(), logSizeDelta, newLogSize
+ );
+ } else {
+ LOG.info(
+ "Segment file compacted [path = {}, log size freed =
{} bytes, new log size = {} bytes].",
+ segmentFile.path(), logSizeDelta, newLogSize
+ );
+ }
+ }
+ }
+
+ private long compactSegmentFile(
+ SegmentFile segmentFile,
+ Path indexFilePath,
+ Long2ObjectMap<GroupDescriptor> segmentFileDescription
+ ) throws IOException {
ByteBuffer buffer = segmentFile.buffer();
validateSegmentFileHeader(buffer, segmentFile.path());
- TmpSegmentFile tmpSegmentFile = null;
+ try (var tmpSegmentFile = new TmpSegmentFile(segmentFile)) {
+ tmpSegmentFile.writeHeader();
- WriteModeIndexMemTable tmpMemTable = null;
+ var tmpMemTable = new SingleThreadMemTable();
- try {
while (!endOfSegmentReached(buffer)) {
int originalStartOfRecordOffset = buffer.position();
@@ -167,11 +200,31 @@ class RaftLogGarbageCollector {
int payloadLength = buffer.getInt();
if (payloadLength <= 0) {
- // Skip special entries (such as truncation records). They
can always be omitted.
- // To identify such entries we rely on the fact that they
have nonpositive length field value.
- int endOfRecordOffset = buffer.position() + Long.BYTES +
CRC_SIZE_BYTES;
+ switch (payloadLength) {
+ case TRUNCATE_SUFFIX_RECORD_MARKER:
+ long lastLogIndexKept = buffer.getLong();
- buffer.position(endOfRecordOffset);
+ tmpMemTable.truncateSuffix(groupId,
lastLogIndexKept);
+
+ break;
+ case TRUNCATE_PREFIX_RECORD_MARKER:
+ long firstLogIndexKept = buffer.getLong();
+
+ tmpMemTable.truncatePrefix(groupId,
firstLogIndexKept);
+
+ break;
+
+ case RESET_RECORD_MARKER:
+ long nextLogIndex = buffer.getLong();
+
+ tmpMemTable.reset(groupId, nextLogIndex);
+
+ break;
+ default:
+ throw new
IllegalStateException(String.format("Unknown record marker [payloadLength =
%d].]", payloadLength));
+ }
+
+ buffer.position(buffer.position() + CRC_SIZE_BYTES);
continue;
}
@@ -180,29 +233,20 @@ class RaftLogGarbageCollector {
long index = VarlenEncoder.readLong(buffer);
- GroupInfo info = logStorageInfos.computeIfAbsent(groupId,
groupInfoProvider::groupInfo);
+ GroupDescriptor groupDescriptor =
segmentFileDescription.get(groupId);
- if (info == null || index < info.firstLogIndexInclusive() ||
index >= info.lastLogIndexExclusive()) {
+ if (groupDescriptor == null || !isLogIndexInRange(index,
groupDescriptor)) {
// We found a truncated entry, it should be skipped.
buffer.position(endOfRecordOffset);
continue;
}
- if (tmpSegmentFile == null) {
- tmpSegmentFile = new TmpSegmentFile(segmentFile);
-
- tmpSegmentFile.writeHeader();
-
- tmpMemTable = new SingleThreadMemTable();
- }
-
int oldLimit = buffer.limit();
// Set the buffer boundaries to only write the current record
to the new file.
buffer.position(originalStartOfRecordOffset).limit(endOfRecordOffset);
- @SuppressWarnings("resource")
long newStartOfRecordOffset =
tmpSegmentFile.fileChannel().position();
writeFully(tmpSegmentFile.fileChannel(), buffer);
@@ -212,53 +256,22 @@ class RaftLogGarbageCollector {
tmpMemTable.appendSegmentFileOffset(groupId, index,
toIntExact(newStartOfRecordOffset));
}
- Path indexFilePath =
indexFileManager.indexFilePath(segmentFile.fileProperties());
-
- long logSizeDelta;
-
- if (tmpSegmentFile != null) {
- tmpSegmentFile.syncAndRename();
-
- // Create a new index file and update the in-memory state to
point to it.
- Path newIndexFilePath = indexFileManager.onIndexFileCompacted(
- tmpMemTable.transitionToReadMode(),
- segmentFile.fileProperties(),
- tmpSegmentFile.fileProperties()
- );
+ assert tmpMemTable.numGroups() != 0
+ : String.format("All entries have been truncated, this
should not happen [path = %s].", segmentFile.path());
- logSizeDelta = Files.size(segmentFile.path())
- + Files.size(indexFilePath)
- - tmpSegmentFile.size()
- - Files.size(newIndexFilePath);
- } else {
- // We got lucky and the whole file can be removed.
- logSizeDelta = Files.size(segmentFile.path()) +
Files.size(indexFilePath);
- }
+ tmpSegmentFile.syncAndRename();
- // Remove the previous generation of the segment file and its
index. This is safe to do, because we rely on the file system
- // guarantees that other threads reading from the segment file
will still be able to do that even if the file is deleted.
- Files.delete(segmentFile.path());
- Files.delete(indexFilePath);
+ // Create a new index file and update the in-memory state to point
to it.
+ Path newIndexFilePath = indexFileManager.onIndexFileCompacted(
+ tmpMemTable.transitionToReadMode(),
+ segmentFile.fileProperties(),
+ tmpSegmentFile.fileProperties()
+ );
- long newLogSize = logSize.addAndGet(-logSizeDelta);
-
- if (LOG.isInfoEnabled()) {
- if (tmpSegmentFile == null) {
- LOG.info(
- "Segment file removed (all entries are truncated)
[path = {}, log size freed = {}, new log size = {}].",
- segmentFile.path(), logSizeDelta, newLogSize
- );
- } else {
- LOG.info(
- "Segment file compacted [path = {}, log size freed
= {}, new log size = {}].",
- segmentFile.path(), logSizeDelta, newLogSize
- );
- }
- }
- } finally {
- if (tmpSegmentFile != null) {
- tmpSegmentFile.close();
- }
+ return Files.size(segmentFile.path())
+ + Files.size(indexFilePath)
+ - tmpSegmentFile.size()
+ - Files.size(newIndexFilePath);
}
}
@@ -268,7 +281,11 @@ class RaftLogGarbageCollector {
}
}
- private class TmpSegmentFile implements ManuallyCloseable {
+ private static boolean isLogIndexInRange(long index, GroupDescriptor
groupDescriptor) {
+ return index >= groupDescriptor.firstLogIndexInclusive() && index <
groupDescriptor.lastLogIndexExclusive();
+ }
+
+ private class TmpSegmentFile implements AutoCloseable {
private final String fileName;
private final Path tmpFilePath;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index cf1195a1621..a0b9d440d62 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -159,7 +159,6 @@ class SegmentFileManager implements ManuallyCloseable {
Path baseDir,
int stripes,
FailureProcessor failureProcessor,
- GroupInfoProvider groupInfoProvider,
RaftConfiguration raftConfiguration,
LogStorageConfiguration storageConfiguration
) throws IOException {
@@ -184,7 +183,7 @@ class SegmentFileManager implements ManuallyCloseable {
logStorageView.maxCheckpointQueueSize()
);
- garbageCollector = new RaftLogGarbageCollector(segmentFilesDir,
indexFileManager, groupInfoProvider);
+ garbageCollector = new RaftLogGarbageCollector(segmentFilesDir,
indexFileManager);
}
void start() throws IOException {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
index f97031939cf..0dffd7c8f29 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
@@ -203,6 +203,29 @@ abstract class AbstractMemTableTest<T extends
WriteModeIndexMemTable & ReadModeI
assertThat(segmentInfo.getOffset(1), is(43));
}
+ @Test
+ void testTruncateSuffixBelowFirstLogIndexPreservesFirstIndexKept() {
+ // Append a single entry at index 15, so firstLogIndexInclusive = 15.
+ memTable.appendSegmentFileOffset(0, 15, 42);
+
+ // Truncate the prefix using an index that is not present in the
memtable.
+ memTable.truncatePrefix(0, 10);
+
+ SegmentInfo afterPrefixTruncate = memTable.segmentInfo(0);
+ assertThat(afterPrefixTruncate, is(notNullValue()));
+ assertThat(afterPrefixTruncate.firstIndexKept(), is(10L));
+
+ // Truncate the suffix using an index that is also not present in the
memtable.
+ memTable.truncateSuffix(0, 14);
+
+ SegmentInfo afterSuffixTruncate = memTable.segmentInfo(0);
+ assertThat(afterSuffixTruncate, is(notNullValue()));
+
+ // The entry at index 15 must no longer be visible.
+ assertThat(afterSuffixTruncate.getOffset(15),
is(MISSING_SEGMENT_FILE_OFFSET));
+ assertThat(afterSuffixTruncate.firstIndexKept(), is(10L));
+ }
+
@Test
void testTruncateSuffixIntoThePast() {
memTable.appendSegmentFileOffset(0, 36, 42);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
index 235e2e2063b..e210910b96c 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -42,15 +42,15 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
groupMeta.addIndexMeta(additionalMeta);
- assertThat(groupMeta.indexMeta(0), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(0), is(nullValue()));
- assertThat(groupMeta.indexMeta(1), is(initialMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(initialMeta));
- assertThat(groupMeta.indexMeta(50), is(additionalMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(50), is(additionalMeta));
- assertThat(groupMeta.indexMeta(66), is(additionalMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(66), is(additionalMeta));
- assertThat(groupMeta.indexMeta(100), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(nullValue()));
}
@Test
@@ -63,17 +63,17 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
groupMeta.addIndexMeta(additionalMeta);
- assertThat(groupMeta.indexMeta(0), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(0), is(nullValue()));
- assertThat(groupMeta.indexMeta(1), is(initialMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(initialMeta));
- assertThat(groupMeta.indexMeta(41), is(initialMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(41), is(initialMeta));
- assertThat(groupMeta.indexMeta(42), is(additionalMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(42), is(additionalMeta));
- assertThat(groupMeta.indexMeta(66), is(additionalMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(66), is(additionalMeta));
- assertThat(groupMeta.indexMeta(100), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(nullValue()));
}
@Test
@@ -82,7 +82,7 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
var groupMeta = new GroupIndexMeta(initialMeta);
- assertThat(groupMeta.indexMeta(1), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(nullValue()));
assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
@@ -92,9 +92,9 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
groupMeta.addIndexMeta(additionalMeta);
- assertThat(groupMeta.indexMeta(1), is(additionalMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(additionalMeta));
- assertThat(groupMeta.indexMeta(2), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(2), is(nullValue()));
assertThat(groupMeta.firstLogIndexInclusive(), is(1L));
@@ -127,7 +127,7 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
RunnableX reader = () -> {
for (int logIndex = 0; logIndex < totalLogEntries; logIndex++) {
- IndexFileMeta indexFileMeta = groupMeta.indexMeta(logIndex);
+ IndexFileMeta indexFileMeta =
groupMeta.indexMetaByLogIndex(logIndex);
if (indexFileMeta != null) {
int relativeFileOrdinal = logIndex / logEntriesPerFile;
@@ -181,7 +181,7 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
int relativeFileOrdinal = 0;
for (int logIndex = 0; logIndex < totalLogEntries; logIndex++) {
- IndexFileMeta indexFileMeta = groupMeta.indexMeta(logIndex);
+ IndexFileMeta indexFileMeta =
groupMeta.indexMetaByLogIndex(logIndex);
int nextFirstLogIndex = expectedFirstLogIndex +
logEntriesPerFile - overlap;
@@ -241,28 +241,28 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
assertThat(groupMeta.firstLogIndexInclusive(), is(1L));
assertThat(groupMeta.lastLogIndexExclusive(), is(200L));
- assertThat(groupMeta.indexMeta(10), is(meta1));
- assertThat(groupMeta.indexMeta(42), is(meta2));
- assertThat(groupMeta.indexMeta(100), is(meta3));
- assertThat(groupMeta.indexMeta(110), is(meta4));
+ assertThat(groupMeta.indexMetaByLogIndex(10), is(meta1));
+ assertThat(groupMeta.indexMetaByLogIndex(42), is(meta2));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(meta3));
+ assertThat(groupMeta.indexMetaByLogIndex(110), is(meta4));
groupMeta.truncatePrefix(43);
- assertThat(groupMeta.indexMeta(10), is(nullValue()));
- assertThat(groupMeta.indexMeta(42), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(10), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(42), is(nullValue()));
// Payload offset is shifted 4 bytes in order to skip the truncated
entry.
var trimmedMeta = new IndexFileMeta(43, 100, 46, new
FileProperties(1));
- assertThat(groupMeta.indexMeta(43), is(trimmedMeta));
- assertThat(groupMeta.indexMeta(100), is(meta3));
- assertThat(groupMeta.indexMeta(110), is(meta4));
+ assertThat(groupMeta.indexMetaByLogIndex(43), is(trimmedMeta));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(meta3));
+ assertThat(groupMeta.indexMetaByLogIndex(110), is(meta4));
groupMeta.truncatePrefix(110);
- assertThat(groupMeta.indexMeta(43), is(nullValue()));
- assertThat(groupMeta.indexMeta(100), is(nullValue()));
- assertThat(groupMeta.indexMeta(110), is(meta4));
+ assertThat(groupMeta.indexMetaByLogIndex(43), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(110), is(meta4));
}
@Test
@@ -276,8 +276,8 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
// Truncate to the end of last meta - everything should be removed.
groupMeta.truncatePrefix(20);
- assertThat(groupMeta.indexMeta(0), is(nullValue()));
- assertThat(groupMeta.indexMeta(19), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(0), is(nullValue()));
+ assertThat(groupMeta.indexMetaByLogIndex(19), is(nullValue()));
assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
}
@@ -294,9 +294,9 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
var compactedMeta2 = new IndexFileMeta(50, 100, 42, new
FileProperties(1, 1));
groupMeta.onIndexCompacted(new FileProperties(1), compactedMeta2);
- assertThat(groupMeta.indexMeta(1), is(meta1));
- assertThat(groupMeta.indexMeta(50), is(compactedMeta2));
- assertThat(groupMeta.indexMeta(100), is(meta3));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(meta1));
+ assertThat(groupMeta.indexMetaByLogIndex(50), is(compactedMeta2));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(meta3));
}
@Test
@@ -317,17 +317,127 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
var compactedMeta1 = new IndexFileMeta(1, 100, 0, new
FileProperties(0, 1));
groupMeta.onIndexCompacted(new FileProperties(0), compactedMeta1);
- assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
- assertThat(groupMeta.indexMeta(42), is(meta2));
- assertThat(groupMeta.indexMeta(100), is(meta3));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(compactedMeta1));
+ assertThat(groupMeta.indexMetaByLogIndex(42), is(meta2));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(meta3));
// Compact meta3 from the newer block.
var compactedMeta3 = new IndexFileMeta(100, 200, 84, new
FileProperties(2, 1));
groupMeta.onIndexCompacted(new FileProperties(2), compactedMeta3);
- assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
- assertThat(groupMeta.indexMeta(42), is(meta2));
- assertThat(groupMeta.indexMeta(100), is(compactedMeta3));
+ assertThat(groupMeta.indexMetaByLogIndex(1), is(compactedMeta1));
+ assertThat(groupMeta.indexMetaByLogIndex(42), is(meta2));
+ assertThat(groupMeta.indexMetaByLogIndex(100), is(compactedMeta3));
+ }
+
+ @Test
+ void testIndexMetaByFileOrdinal() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(1));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(2));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(3));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(1), is(meta1));
+ assertThat(groupMeta.indexMetaByFileOrdinal(2), is(meta2));
+ assertThat(groupMeta.indexMetaByFileOrdinal(3), is(meta3));
+
+ // Ordinals before the first and after the last return null.
+ assertThat(groupMeta.indexMetaByFileOrdinal(0), is(nullValue()));
+ assertThat(groupMeta.indexMetaByFileOrdinal(4), is(nullValue()));
+ }
+
+ @Test
+ void testIndexMetaByFileOrdinalWithMultipleBlocks() {
+ // meta1 is in block 0.
+ var meta1 = new IndexFileMeta(1, 100, 0, new FileProperties(1));
+ var groupMeta = new GroupIndexMeta(meta1);
+
+ // meta2 overlaps meta1, creating a second deque block.
+ var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(2));
+ groupMeta.addIndexMeta(meta2);
+
+ // meta3 is appended to the second block (consecutive to meta2).
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(3));
+ groupMeta.addIndexMeta(meta3);
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(1), is(meta1));
+ assertThat(groupMeta.indexMetaByFileOrdinal(2), is(meta2));
+ assertThat(groupMeta.indexMetaByFileOrdinal(3), is(meta3));
+
+ // Ordinals before the first and after the last return null.
+ assertThat(groupMeta.indexMetaByFileOrdinal(0), is(nullValue()));
+ assertThat(groupMeta.indexMetaByFileOrdinal(4), is(nullValue()));
+ }
+
+ @Test
+ void testIndexMetaByFileOrdinalAfterTruncatePrefix() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ // After prefix truncation, ordinal 0 is dropped; ordinals 1 and 2
must still be found correctly.
+ groupMeta.truncatePrefix(75);
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(0), is(nullValue()));
+
+ // meta2 was trimmed – the ordinal is the same but
firstLogIndexInclusive changed.
+ IndexFileMeta trimmedMeta2 = groupMeta.indexMetaByFileOrdinal(1);
+ assertThat(trimmedMeta2, is(notNullValue()));
+ assertThat(trimmedMeta2.firstLogIndexInclusive(), is(75L));
+ assertThat(trimmedMeta2.indexFileProperties(), is(new
FileProperties(1)));
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(2), is(meta3));
+ }
+
+ @Test
+ void testIndexMetaByFileOrdinalAfterTruncatePrefixWithMultipleBlocks() {
+ // meta1 is in block 0.
+ var meta1 = new IndexFileMeta(1, 100, 0, new FileProperties(1));
+ var groupMeta = new GroupIndexMeta(meta1);
+
+ // meta2 overlaps meta1, creating a second deque block.
+ var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(2));
+ groupMeta.addIndexMeta(meta2);
+
+ // meta3 is appended to the second block (consecutive to meta2).
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(3));
+ groupMeta.addIndexMeta(meta3);
+
+ // After prefix truncation, ordinal 0 is dropped; ordinals 1 and 2
must still be found correctly.
+ groupMeta.truncatePrefix(75);
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(1), is(nullValue()));
+
+ // meta2 was trimmed – the ordinal is the same but
firstLogIndexInclusive changed.
+ IndexFileMeta trimmedMeta2 = groupMeta.indexMetaByFileOrdinal(2);
+ assertThat(trimmedMeta2, is(notNullValue()));
+ assertThat(trimmedMeta2.firstLogIndexInclusive(), is(75L));
+ assertThat(trimmedMeta2.indexFileProperties(), is(new
FileProperties(2)));
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(3), is(meta3));
+ }
+
+ @Test
+ void testIndexMetaByFileOrdinalAfterCompaction() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+
+ // Simulate compaction: ordinal stays the same, generation is bumped.
+ var compactedMeta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0,
1));
+ groupMeta.onIndexCompacted(new FileProperties(0), compactedMeta1);
+
+ assertThat(groupMeta.indexMetaByFileOrdinal(0), is(compactedMeta1));
+ assertThat(groupMeta.indexMetaByFileOrdinal(1), is(meta2));
}
@RepeatedTest(100)
@@ -347,7 +457,7 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
RunnableX truncateTask = () -> groupMeta.truncatePrefix(43);
RunnableX readTask = () -> {
- IndexFileMeta indexFileMeta = groupMeta.indexMeta(51);
+ IndexFileMeta indexFileMeta = groupMeta.indexMetaByLogIndex(51);
assertThat(indexFileMeta, is(notNullValue()));
assertThat(indexFileMeta.firstLogIndexInclusive(),
is(anyOf(equalTo(42L), equalTo(43L))));
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
index d17429bbb10..a476d09c614 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -17,18 +17,18 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static ca.seinesoftware.hamcrest.path.PathMatcher.exists;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.when;
+import static org.hamcrest.Matchers.not;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,7 +37,6 @@ import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -47,18 +46,15 @@ import
org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
/**
@@ -89,9 +85,6 @@ class RaftLogGarbageCollectorTest extends IgniteAbstractTest {
private RaftLogGarbageCollector garbageCollector;
- @Mock
- private GroupInfoProvider groupInfoProvider;
-
@BeforeEach
void setUp() throws IOException {
fileManager = new SegmentFileManager(
@@ -99,7 +92,6 @@ class RaftLogGarbageCollectorTest extends IgniteAbstractTest {
workDir,
STRIPES,
new NoOpFailureManager(),
- groupInfoProvider,
raftConfiguration,
storageConfiguration
);
@@ -119,16 +111,17 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
}
@Test
- void testCompactSegmentFileWithAllEntriesTruncated() throws Exception {
+ void testRunCompactionWithFileFullyCompacted() throws Exception {
+ // Fill some segment files with entries.
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
}
- await().until(this::indexFiles, hasSize(greaterThan(0)));
+ // Truncate most of the previously inserted entries.
+ fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
- // This is equivalent to prefix truncation up to the latest index.
- when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+ triggerAndAwaitCheckpoint(batches.size() - 1);
List<Path> segmentFiles = segmentFiles();
@@ -138,127 +131,122 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
false);
try {
- garbageCollector.compactSegmentFile(segmentFile);
+ garbageCollector.runCompaction(segmentFile);
} finally {
segmentFile.close();
}
- assertThat(Files.exists(segmentFilePath), is(false));
-
assertThat(Files.exists(indexFileManager.indexFilePath(fileProperties)),
is(false));
+ assertThat(segmentFilePath, not(exists()));
+ assertThat(indexFileManager.indexFilePath(fileProperties),
not(exists()));
// Validate that no files with increased generation have been created.
var newFileProperties = new FileProperties(fileProperties.ordinal(),
fileProperties.generation() + 1);
assertThat(segmentFiles(), hasSize(segmentFiles.size() - 1));
-
assertThat(Files.exists(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties))),
is(false));
-
assertThat(Files.exists(indexFileManager.indexFilePath(newFileProperties)),
is(false));
+
assertThat(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties)),
not(exists()));
+ assertThat(indexFileManager.indexFilePath(newFileProperties),
not(exists()));
}
@Test
- void testCompactSegmentFileWithSomeEntriesTruncated() throws Exception {
- List<byte[]> batches = createRandomData(FILE_SIZE / 8, 10);
+ void testRunCompactionWithFilePartiallyCompacted() throws Exception {
+ // Fill some segment files with entries from two groups.
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
appendBytes(GROUP_ID_2, batches.get(i), i);
}
- await().until(this::indexFiles, hasSize(greaterThan(0)));
+ // Truncate entries of only one group.
+ fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
+
+ triggerAndAwaitCheckpoint(batches.size() - 1);
List<Path> segmentFiles = segmentFiles();
long originalSize = Files.size(segmentFiles.get(0));
- // Emulate truncation of one group
- when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
- when(groupInfoProvider.groupInfo(GROUP_ID_2)).thenReturn(new
GroupInfo(1, batches.size() - 1));
-
Path firstSegmentFile = segmentFiles.get(0);
FileProperties originalFileProperties =
SegmentFile.fileProperties(firstSegmentFile);
SegmentFile segmentFile = SegmentFile.openExisting(firstSegmentFile,
false);
try {
- garbageCollector.compactSegmentFile(segmentFile);
+ garbageCollector.runCompaction(segmentFile);
} finally {
segmentFile.close();
}
- assertThat(Files.exists(firstSegmentFile), is(false));
+ // Segment file should be replaced by a new one with increased
generation.
+ assertThat(firstSegmentFile, not(exists()));
var newFileProperties = new
FileProperties(originalFileProperties.ordinal(),
originalFileProperties.generation() + 1);
Path newSegmentFile =
fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties));
- assertThat(Files.exists(newSegmentFile), is(true));
+ assertThat(newSegmentFile, exists());
assertThat(Files.size(newSegmentFile), lessThan(originalSize));
-
assertThat(Files.exists(indexFileManager.indexFilePath(newFileProperties)),
is(true));
-
assertThat(Files.exists(indexFileManager.indexFilePath(originalFileProperties)),
is(false));
+ assertThat(indexFileManager.indexFilePath(newFileProperties),
exists());
+ assertThat(indexFileManager.indexFilePath(originalFileProperties),
not(exists()));
}
@Test
- void testCompactSegmentFileWithTruncationRecords() throws Exception {
+ void testRunCompactionWithTruncationRecords() throws Exception {
List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
}
- fileManager.truncatePrefix(GROUP_ID_1, 2);
- fileManager.truncateSuffix(GROUP_ID_1, 3);
-
- await().until(this::indexFiles, hasSize(greaterThan(0)));
+ // Truncate both prefix and suffix.
+ fileManager.truncatePrefix(GROUP_ID_1, batches.size() / 2);
+ fileManager.truncateSuffix(GROUP_ID_1, batches.size() / 2);
List<Path> segmentFiles = segmentFiles();
- assertThat(segmentFiles, hasSize(greaterThan(1)));
- // This is equivalent to prefix truncation up to the latest index.
- when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+ triggerAndAwaitCheckpoint(batches.size() / 2);
- Path firstSegmentFile = segmentFiles.get(0);
+ for (Path segmentFilePath : segmentFiles) {
+ SegmentFile segmentFile =
SegmentFile.openExisting(segmentFilePath, false);
- SegmentFile segmentFile = SegmentFile.openExisting(firstSegmentFile,
false);
- try {
- garbageCollector.compactSegmentFile(segmentFile);
- } finally {
- segmentFile.close();
- }
+ try {
+ garbageCollector.runCompaction(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
- assertThat(Files.exists(firstSegmentFile), is(false));
+ assertThat(segmentFilePath, not(exists()));
+ }
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27964")
@RepeatedTest(10)
void testConcurrentCompactionAndReads() throws Exception {
List<byte[]> batches = createRandomData(FILE_SIZE / 10, 50);
+
for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
}
await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size()
- 1)));
- var firstAliveIndex = new AtomicLong();
var gcTaskDone = new AtomicBoolean(false);
- when(groupInfoProvider.groupInfo(GROUP_ID_1))
- .thenAnswer(invocationOnMock -> new
GroupInfo(firstAliveIndex.get(), batches.size() - 1));
-
RunnableX gcTask = () -> {
try {
List<Path> segmentFiles = segmentFiles();
Path lastSegmentFile = segmentFiles.get(segmentFiles.size() -
1);
+ long aliveIndex = 0;
+
// Don't compact the last segment file.
while (!segmentFiles.get(0).equals(lastSegmentFile)) {
- long index = firstAliveIndex.incrementAndGet();
-
- fileManager.truncatePrefix(GROUP_ID_1, index);
+ fileManager.truncatePrefix(GROUP_ID_1, ++aliveIndex);
SegmentFile segmentFile =
SegmentFile.openExisting(segmentFiles.get(0), false);
try {
- garbageCollector.compactSegmentFile(segmentFile);
+ garbageCollector.runCompaction(segmentFile);
} finally {
segmentFile.close();
}
@@ -288,10 +276,10 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
runRace(gcTask, readTask, readTask, readTask);
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27964")
@RepeatedTest(10)
void testConcurrentCompactionAndReadsFromMultipleGroups() throws Exception
{
List<byte[]> batches = createRandomData(FILE_SIZE / 10, 50);
+
for (int i = 0; i < batches.size(); i++) {
appendBytes(GROUP_ID_1, batches.get(i), i);
appendBytes(GROUP_ID_2, batches.get(i), i);
@@ -299,16 +287,8 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size()
- 1)));
- var firstAliveIndex = new AtomicLong();
var gcTaskDone = new AtomicBoolean(false);
- when(groupInfoProvider.groupInfo(GROUP_ID_1))
- .thenAnswer(invocationOnMock -> new
GroupInfo(firstAliveIndex.get(), batches.size() - 1));
-
- // Group 2 is never compacted.
- when(groupInfoProvider.groupInfo(GROUP_ID_2))
- .thenAnswer(invocationOnMock -> new GroupInfo(0,
batches.size() - 1));
-
RunnableX gcTask = () -> {
try {
List<Path> segmentFiles = segmentFiles();
@@ -319,10 +299,10 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
// Because of that we need to use more complex logic to
iterate over the segment files.
int segmentFilesIndex = 0;
- while (true) {
- long index = firstAliveIndex.incrementAndGet();
+ long aliveIndex = 0;
- fileManager.truncatePrefix(GROUP_ID_1, index);
+ while (true) {
+ fileManager.truncatePrefix(GROUP_ID_1, ++aliveIndex);
FileProperties fileProperties =
SegmentFile.fileProperties(curSegmentFilePath);
@@ -330,7 +310,7 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
SegmentFile segmentFile =
SegmentFile.openExisting(curSegmentFilePath, false);
try {
- garbageCollector.compactSegmentFile(segmentFile);
+ garbageCollector.runCompaction(segmentFile);
} finally {
segmentFile.close();
}
@@ -400,19 +380,16 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
appendBytes(GROUP_ID_2, batches.get(i), i);
}
+ // Truncate entries of only one group.
fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
- await().until(this::indexFiles, hasSize(greaterThan(0)));
+ triggerAndAwaitCheckpoint(batches.size() - 1);
List<Path> segmentFiles = segmentFiles();
- // Emulate truncation of one group
- when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
- when(groupInfoProvider.groupInfo(GROUP_ID_2)).thenReturn(new
GroupInfo(1, batches.size() - 1));
-
SegmentFile segmentFile =
SegmentFile.openExisting(segmentFiles.get(0), false);
try {
- garbageCollector.compactSegmentFile(segmentFile);
+ garbageCollector.runCompaction(segmentFile);
} finally {
segmentFile.close();
}
@@ -424,7 +401,6 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
workDir,
STRIPES,
new NoOpFailureManager(),
- groupInfoProvider,
raftConfiguration,
storageConfiguration
);
@@ -480,25 +456,51 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
workDir,
STRIPES,
new NoOpFailureManager(),
- groupInfoProvider,
raftConfiguration,
storageConfiguration
);
fileManager.garbageCollector().cleanupLeftoverFiles();
- // Verify temporary files are cleaned up
- assertFalse(Files.exists(tmpFile1));
- assertFalse(Files.exists(tmpFile2));
+ // Verify temporary files are cleaned up.
+ assertThat(tmpFile1, not(exists()));
+ assertThat(tmpFile2, not(exists()));
- // Verify duplicate segment files are cleaned up (lower generation
removed)
- assertFalse(Files.exists(segmentFile1Gen0));
- assertTrue(Files.exists(segmentFile1Gen1));
- assertFalse(Files.exists(indexFile1Gen0));
- assertTrue(Files.exists(indexFile1Gen1));
+ // Verify duplicate segment files are cleaned up (lower generation
removed).
+ assertThat(segmentFile1Gen0, not(exists()));
+ assertThat(segmentFile1Gen1, exists());
+ assertThat(indexFile1Gen0, not(exists()));
+ assertThat(indexFile1Gen1, exists());
- // Verify orphaned index files are cleaned up
- assertFalse(Files.exists(orphanedIndexFile));
+ // Verify orphaned index files are cleaned up.
+ assertThat(orphanedIndexFile, not(exists()));
+ }
+
+ @Test
+ void testRunCompactionDoesNotRetainEntriesTruncatedBySuffix() throws
Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ fileManager.truncateSuffix(GROUP_ID_1, 1);
+
+ triggerAndAwaitCheckpoint(1);
+
+ // Since we truncated all entries after log index 1, we can expect
that the second segment file will be fully compacted.
+ List<Path> segmentFiles = segmentFiles();
+
+ assertThat(segmentFiles, hasSize(greaterThan(2)));
+
+ SegmentFile segmentFile =
SegmentFile.openExisting(segmentFiles.get(1), false);
+ try {
+ garbageCollector.runCompaction(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ assertThat(segmentFiles.get(1), not(exists()));
}
private List<Path> segmentFiles() throws IOException {
@@ -546,4 +548,22 @@ class RaftLogGarbageCollectorTest extends
IgniteAbstractTest {
}
});
}
+
+ private void triggerAndAwaitCheckpoint(long lastGroupIndex) throws
IOException {
+ List<Path> segmentFilesBeforeCheckpoint = segmentFiles();
+
+ // Insert some entries to trigger a rollover (and a checkpoint).
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), lastGroupIndex + i + 1);
+ }
+
+ List<Path> segmentFilesAfterCheckpoint = segmentFiles();
+
+ assertThat(segmentFilesAfterCheckpoint,
hasSize(greaterThan(segmentFilesBeforeCheckpoint.size())));
+
+ // Wait for the checkpoint process to complete.
+ await().until(this::indexFiles,
hasSize(greaterThanOrEqualTo(segmentFilesAfterCheckpoint.size() - 1)));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
index 6828daf5e4d..c0eb09c6b53 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
@@ -87,7 +87,6 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
workDir,
STRIPES,
new NoOpFailureManager(),
- groupId -> null,
raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 6ff0d7c9460..8c807827032 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -119,7 +119,6 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
workDir,
STRIPES,
failureManager,
- GroupInfoProvider.NO_OP,
raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index b6237aa569c..005223b3b0e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -61,7 +61,6 @@ class SegstoreLogStorageConcurrencyTest extends
IgniteAbstractTest {
workDir,
1,
new NoOpFailureManager(),
- GroupInfoProvider.NO_OP,
raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 9009e122163..c058d46078d 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -64,7 +64,6 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
path,
1,
new NoOpFailureManager(),
- GroupInfoProvider.NO_OP,
raftConfiguration,
storageConfiguration
);