This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c0c8663ee5e IGNITE-26294 Implement Raft log Garbage Collector (#7610)
c0c8663ee5e is described below
commit c0c8663ee5e75e929f97f1ee48dedd1da5f3a840
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Feb 25 18:16:47 2026 +0200
IGNITE-26294 Implement Raft log Garbage Collector (#7610)
---
.../raft/storage/segstore/GroupIndexMeta.java | 60 ++-
.../raft/storage/segstore/GroupInfoProvider.java | 50 ++
.../raft/storage/segstore/IndexFileManager.java | 199 ++++++--
.../raft/storage/segstore/IndexFileMetaArray.java | 49 ++
.../storage/segstore/RaftLogGarbageCollector.java | 316 ++++++++++++
.../raft/storage/segstore/SegmentFile.java | 23 +-
.../raft/storage/segstore/SegmentFileManager.java | 61 ++-
.../storage/segstore/SegmentPayloadParser.java | 6 +-
.../raft/storage/segstore/GroupIndexMetaTest.java | 79 +++
.../storage/segstore/IndexFileManagerTest.java | 66 ++-
.../segstore/RaftLogGarbageCollectorTest.java | 549 +++++++++++++++++++++
.../segstore/SegmentFileManagerGetEntryTest.java | 1 +
.../storage/segstore/SegmentFileManagerTest.java | 6 +-
.../raft/storage/segstore/SegmentFileTest.java | 3 +-
.../SegstoreLogStorageConcurrencyTest.java | 1 +
.../storage/segstore/SegstoreLogStorageTest.java | 1 +
16 files changed, 1363 insertions(+), 107 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
index 912fedb951a..edf3c50c54b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -46,12 +46,6 @@ class GroupIndexMeta {
this.fileMetas = new IndexFileMetaArray(startFileMeta);
}
- void addIndexMeta(IndexFileMeta indexFileMeta) {
- IndexFileMetaArray fileMetas = this.fileMetas;
-
- setFileMetas(fileMetas, fileMetas.add(indexFileMeta));
- }
-
long firstLogIndexInclusive() {
return fileMetas.firstLogIndexInclusive();
}
@@ -60,21 +54,48 @@ class GroupIndexMeta {
return fileMetas.lastLogIndexExclusive();
}
+ void addIndexMeta(IndexFileMeta indexFileMeta) {
+ while (true) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+
+ if (FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas)) {
+ return;
+ }
+ }
+ }
+
/**
* Removes all metas which log indices are smaller than the given
value.
*/
void truncateIndicesSmallerThan(long firstLogIndexKept) {
- IndexFileMetaArray fileMetas = this.fileMetas;
+ while (true) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ IndexFileMetaArray newFileMetas =
fileMetas.truncateIndicesSmallerThan(firstLogIndexKept);
- setFileMetas(fileMetas,
fileMetas.truncateIndicesSmallerThan(firstLogIndexKept));
+ if (FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas)) {
+ return;
+ }
+ }
}
- private void setFileMetas(IndexFileMetaArray fileMetas,
IndexFileMetaArray newFileMetas) {
- // Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
- // this invariant, just in case.
- boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas);
+ boolean onIndexCompacted(FileProperties oldProperties, IndexFileMeta
newProperties) {
+ while (true) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ IndexFileMetaArray newFileMetas =
fileMetas.onIndexCompacted(oldProperties, newProperties);
+
+ // Nothing was updated which means the array does not contain
index meta for the compacted file.
+ if (fileMetas == newFileMetas) {
+ return false;
+ }
- assert updated : "Concurrent writes detected";
+ if (FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas)) {
+ return true;
+ }
+ }
}
}
@@ -183,6 +204,19 @@ class GroupIndexMeta {
}
}
+ /**
+ * Called when an index file is being compacted by the GC.
+ *
+ * <p>This means that we need to find index meta for the file being
compacted and replace it with the meta of the new file.
+ */
+ void onIndexCompacted(FileProperties oldProperties, IndexFileMeta
newIndexMeta) {
+ for (IndexMetaArrayHolder holder : fileMetaDeque) {
+ if (holder.onIndexCompacted(oldProperties, newIndexMeta)) {
+ return;
+ }
+ }
+ }
+
long firstLogIndexInclusive() {
for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
long firstLogIndex = indexMetaArrayHolder.firstLogIndexInclusive();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
new file mode 100644
index 00000000000..186a06d31fa
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupInfoProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides information about a Raft group present in the storage.
+ */
+@FunctionalInterface
+interface GroupInfoProvider {
+ GroupInfoProvider NO_OP = groupId -> null;
+
+ class GroupInfo {
+ private final long firstLogIndexInclusive;
+
+ private final long lastLogIndexExclusive;
+
+ GroupInfo(long firstLogIndexInclusive, long lastLogIndexExclusive) {
+ this.firstLogIndexInclusive = firstLogIndexInclusive;
+ this.lastLogIndexExclusive = lastLogIndexExclusive;
+ }
+
+ long firstLogIndexInclusive() {
+ return firstLogIndexInclusive;
+ }
+
+ long lastLogIndexExclusive() {
+ return lastLogIndexExclusive;
+ }
+ }
+
+ @Nullable
+ GroupInfo groupInfo(long groupId);
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index f5bb44f8635..468b51530a4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -32,9 +32,12 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -129,6 +132,7 @@ class IndexFileManager {
/**
* Index file metadata grouped by Raft Group ID.
*/
+ // FIXME: This map is never cleaned up, see
https://issues.apache.org/jira/browse/IGNITE-27926.
private final Map<Long, GroupIndexMeta> groupIndexMetas = new
ConcurrentHashMap<>();
IndexFileManager(Path baseDir) throws IOException {
@@ -151,7 +155,7 @@ class IndexFileManager {
return indexFilesDir;
}
- void cleanupTmpFiles() throws IOException {
+ void cleanupLeftoverFiles() throws IOException {
try (Stream<Path> indexFiles = Files.list(indexFilesDir)) {
Iterator<Path> it = indexFiles.iterator();
@@ -175,25 +179,31 @@ class IndexFileManager {
Path saveNewIndexMemtable(ReadModeIndexMemTable indexMemTable) throws
IOException {
var newFileProperties = new FileProperties(++curFileOrdinal);
- return saveIndexMemtable(indexMemTable, newFileProperties, false);
+ Path indexFilePath = indexFilePath(newFileProperties);
+
+ List<IndexMetaSpec> metaSpecs = saveIndexMemtable(indexFilePath,
indexMemTable, newFileProperties);
+
+ metaSpecs.forEach(this::putIndexFileMeta);
+
+ return indexFilePath;
}
- private Path saveIndexMemtable(
+ private List<IndexMetaSpec> saveIndexMemtable(
+ Path indexFilePath,
ReadModeIndexMemTable indexMemTable,
- FileProperties fileProperties,
- boolean onRecovery
+ FileProperties fileProperties
) throws IOException {
- String fileName = indexFileName(fileProperties);
+ String fileName = indexFilePath.getFileName().toString();
Path tmpFilePath = indexFilesDir.resolve(fileName + TMP_FILE_SUFFIX);
assert !Files.exists(indexFilesDir.resolve(fileName)) : "Index file
already exists: " + fileName;
assert !Files.exists(tmpFilePath) : "Temporary index file already
exists: " + tmpFilePath;
- try (var os = new
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
- byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable,
fileProperties, onRecovery);
+ FileHeaderWithIndexMetas fileHeaderWithIndexMetas =
serializeHeaderAndFillMetadata(indexMemTable, fileProperties);
- os.write(headerBytes);
+ try (var os = new
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
+ os.write(fileHeaderWithIndexMetas.header());
Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
@@ -207,7 +217,9 @@ class IndexFileManager {
}
}
- return syncAndRename(tmpFilePath,
tmpFilePath.resolveSibling(fileName));
+ syncAndRename(tmpFilePath, tmpFilePath.resolveSibling(fileName));
+
+ return fileHeaderWithIndexMetas.indexMetas();
}
/**
@@ -215,7 +227,33 @@ class IndexFileManager {
* lost due to a component stop before a checkpoint was able to complete.
*/
void recoverIndexFile(ReadModeIndexMemTable indexMemTable, FileProperties
fileProperties) throws IOException {
- saveIndexMemtable(indexMemTable, fileProperties, true);
+ // On recovery we are only creating missing index files, in-memory
meta will be created on Index File Manager start.
+ // (see recoverIndexFileMetas).
+ saveIndexMemtable(indexFilePath(fileProperties), indexMemTable,
fileProperties);
+ }
+
+ Path onIndexFileCompacted(
+ ReadModeIndexMemTable indexMemTable,
+ FileProperties oldIndexFileProperties,
+ FileProperties newIndexFileProperties
+ ) throws IOException {
+ Path newIndexFilePath = indexFilePath(newIndexFileProperties);
+
+ List<IndexMetaSpec> metaSpecs = saveIndexMemtable(newIndexFilePath,
indexMemTable, newIndexFileProperties);
+
+ metaSpecs.forEach(metaSpec -> {
+ GroupIndexMeta groupIndexMeta =
groupIndexMetas.get(metaSpec.groupId);
+
+ IndexFileMeta meta = metaSpec.indexFileMeta();
+
+ if (groupIndexMeta != null && meta != null) {
+ groupIndexMeta.onIndexCompacted(oldIndexFileProperties, meta);
+ }
+ });
+
+ LOG.info("New index file created after compaction [path={}].",
newIndexFilePath);
+
+ return newIndexFilePath;
}
/**
@@ -224,43 +262,51 @@ class IndexFileManager {
*/
@Nullable
SegmentFilePointer getSegmentFilePointer(long groupId, long logIndex)
throws IOException {
- GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
+ while (true) {
+ GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
- if (groupIndexMeta == null) {
- return null;
- }
+ if (groupIndexMeta == null) {
+ return null;
+ }
- IndexFileMeta indexFileMeta = groupIndexMeta.indexMeta(logIndex);
+ IndexFileMeta indexFileMeta = groupIndexMeta.indexMeta(logIndex);
- if (indexFileMeta == null) {
- return null;
- }
+ if (indexFileMeta == null) {
+ return null;
+ }
- Path indexFile =
indexFilesDir.resolve(indexFileName(indexFileMeta.indexFileProperties()));
+ Path indexFile =
indexFilesDir.resolve(indexFileName(indexFileMeta.indexFileProperties()));
- // Index file payload is a 0-based array, which indices correspond to
the [fileMeta.firstLogIndex, fileMeta.lastLogIndex) range.
- long payloadArrayIndex = logIndex -
indexFileMeta.firstLogIndexInclusive();
+ // TODO: Consider caching the opened channels for index files:
https://issues.apache.org/jira/browse/IGNITE-26622.
+ try (SeekableByteChannel channel = Files.newByteChannel(indexFile,
StandardOpenOption.READ)) {
+ // Index file payload is a 0-based array, which indices
correspond to the [fileMeta.firstLogIndex, fileMeta.lastLogIndex)
+ // range.
+ long payloadArrayIndex = logIndex -
indexFileMeta.firstLogIndexInclusive();
- assert payloadArrayIndex >= 0 : payloadArrayIndex;
+ assert payloadArrayIndex >= 0 : payloadArrayIndex;
- long payloadOffset = indexFileMeta.indexFilePayloadOffset() +
payloadArrayIndex * Integer.BYTES;
+ long payloadOffset = indexFileMeta.indexFilePayloadOffset() +
payloadArrayIndex * Integer.BYTES;
- try (SeekableByteChannel channel = Files.newByteChannel(indexFile,
StandardOpenOption.READ)) {
- channel.position(payloadOffset);
+ channel.position(payloadOffset);
- ByteBuffer segmentPayloadOffsetBuffer =
ByteBuffer.allocate(Integer.BYTES).order(BYTE_ORDER);
+ ByteBuffer segmentPayloadOffsetBuffer =
ByteBuffer.allocate(Integer.BYTES).order(BYTE_ORDER);
- while (segmentPayloadOffsetBuffer.hasRemaining()) {
- int bytesRead = channel.read(segmentPayloadOffsetBuffer);
+ while (segmentPayloadOffsetBuffer.hasRemaining()) {
+ int bytesRead = channel.read(segmentPayloadOffsetBuffer);
- if (bytesRead == -1) {
- throw new EOFException("EOF reached while reading index
file: " + indexFile);
+ if (bytesRead == -1) {
+ throw new EOFException("EOF reached while reading
index file: " + indexFile);
+ }
}
- }
- int segmentPayloadOffset = segmentPayloadOffsetBuffer.getInt(0);
+ int segmentPayloadOffset =
segmentPayloadOffsetBuffer.getInt(0);
- return new SegmentFilePointer(indexFileMeta.indexFileProperties(),
segmentPayloadOffset);
+ return new
SegmentFilePointer(indexFileMeta.indexFileProperties(), segmentPayloadOffset);
+ } catch (NoSuchFileException e) {
+ // There exists a race between the Garbage Collection process
and "groupIndexMetas.get" call. It is possible that
+ // groupIndexMetas returned stale information, we should
simply retry in this case.
+ LOG.info("Index file not found, retrying [path={}].",
indexFile);
+ }
}
}
@@ -282,15 +328,14 @@ class IndexFileManager {
return groupIndexMeta == null ? -1 :
groupIndexMeta.lastLogIndexExclusive();
}
- boolean indexFileExists(FileProperties fileProperties) {
- return Files.exists(indexFilePath(fileProperties));
- }
-
Path indexFilePath(FileProperties fileProperties) {
return indexFilesDir.resolve(indexFileName(fileProperties));
}
- private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable
indexMemTable, FileProperties fileProperties, boolean onRecovery) {
+ private static FileHeaderWithIndexMetas serializeHeaderAndFillMetadata(
+ ReadModeIndexMemTable indexMemTable,
+ FileProperties fileProperties
+ ) {
int numGroups = indexMemTable.numGroups();
int headerSize = headerSize(numGroups);
@@ -303,6 +348,8 @@ class IndexFileManager {
int payloadOffset = headerSize;
+ var metaSpecs = new ArrayList<IndexMetaSpec>(numGroups);
+
Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
while (it.hasNext()) {
@@ -319,15 +366,11 @@ class IndexFileManager {
long firstIndexKept = segmentInfo.firstIndexKept();
- // On recovery we are only creating missing index files, in-memory
meta will be created on Index File Manager start.
- // (see recoverIndexFileMetas).
- if (!onRecovery) {
- IndexFileMeta indexFileMeta = createIndexFileMeta(
- firstLogIndexInclusive, lastLogIndexExclusive,
firstIndexKept, payloadOffset, fileProperties
- );
+ IndexFileMeta indexFileMeta = createIndexFileMeta(
+ firstLogIndexInclusive, lastLogIndexExclusive,
firstIndexKept, payloadOffset, fileProperties
+ );
- putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
- }
+ metaSpecs.add(new IndexMetaSpec(groupId, indexFileMeta,
firstIndexKept));
headerBuffer
.putLong(groupId)
@@ -340,7 +383,7 @@ class IndexFileManager {
payloadOffset += payloadSize(segmentInfo);
}
- return headerBuffer.array();
+ return new FileHeaderWithIndexMetas(headerBuffer.array(), metaSpecs);
}
private static @Nullable IndexFileMeta createIndexFileMeta(
@@ -370,7 +413,14 @@ class IndexFileManager {
return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive,
adjustedPayloadOffset, fileProperties);
}
- private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta
indexFileMeta, long firstIndexKept) {
+ private void putIndexFileMeta(IndexMetaSpec metaSpec) {
+ IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
+
+ // Using boxed value to avoid unnecessary autoboxing later.
+ Long groupId = metaSpec.groupId();
+
+ long firstIndexKept = metaSpec.firstIndexKept();
+
GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
if (existingGroupIndexMeta == null) {
@@ -469,12 +519,14 @@ class IndexFileManager {
firstLogIndexInclusive, lastLogIndexExclusive,
firstIndexKept, payloadOffset, fileProperties
);
- putIndexFileMeta(groupId, indexFileMeta, firstIndexKept);
+ var metaSpec = new IndexMetaSpec(groupId, indexFileMeta,
firstIndexKept);
+
+ putIndexFileMeta(metaSpec);
}
}
}
- private static FileProperties indexFileProperties(Path indexFile) {
+ static FileProperties indexFileProperties(Path indexFile) {
String fileName = indexFile.getFileName().toString();
Matcher matcher = INDEX_FILE_NAME_PATTERN.matcher(fileName);
@@ -498,4 +550,49 @@ class IndexFileManager {
return result;
}
+
+ private static class FileHeaderWithIndexMetas {
+ private final byte[] header;
+
+ private final List<IndexMetaSpec> indexMetas;
+
+ FileHeaderWithIndexMetas(byte[] header, List<IndexMetaSpec>
indexMetas) {
+ this.header = header;
+ this.indexMetas = indexMetas;
+ }
+
+ byte[] header() {
+ return header;
+ }
+
+ List<IndexMetaSpec> indexMetas() {
+ return indexMetas;
+ }
+ }
+
+ private static class IndexMetaSpec {
+ private final Long groupId;
+
+ private final @Nullable IndexFileMeta indexFileMeta;
+
+ private final long firstIndexKept;
+
+ IndexMetaSpec(Long groupId, @Nullable IndexFileMeta indexFileMeta,
long firstIndexKept) {
+ this.groupId = groupId;
+ this.indexFileMeta = indexFileMeta;
+ this.firstIndexKept = firstIndexKept;
+ }
+
+ Long groupId() {
+ return groupId;
+ }
+
+ @Nullable IndexFileMeta indexFileMeta() {
+ return indexFileMeta;
+ }
+
+ long firstIndexKept() {
+ return firstIndexKept;
+ }
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
index 46b9ffc5df8..5801bee7a16 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -163,4 +163,53 @@ class IndexFileMetaArray {
return new IndexFileMetaArray(newArray, newSize);
}
+
+ IndexFileMetaArray onIndexCompacted(FileProperties oldProperties,
IndexFileMeta newMeta) {
+ // Find index meta associated with the file being compacted.
+ int smallestOrdinal = array[0].indexFileProperties().ordinal();
+
+ assert oldProperties.ordinal() >= smallestOrdinal;
+
+ int updateIndex = oldProperties.ordinal() - smallestOrdinal;
+
+ if (updateIndex >= size) {
+ return this;
+ }
+
+ IndexFileMeta oldMeta = array[updateIndex];
+
+ assert oldMeta.indexFileProperties().equals(oldProperties)
+ : String.format("File properties mismatch [expected=%s,
actual=%s].", oldMeta.indexFileProperties(), oldProperties);
+
+ if (updateIndex > 0) {
+ IndexFileMeta prevOldMeta = array[updateIndex - 1];
+
+ assert newMeta.firstLogIndexInclusive() ==
prevOldMeta.lastLogIndexExclusive() :
+ String.format("Index File Metas must be contiguous.
Expected log index: %d, actual log index: %d",
+ prevOldMeta.lastLogIndexExclusive(),
+ newMeta.firstLogIndexInclusive()
+ );
+ }
+
+ if (updateIndex < size - 1) {
+ IndexFileMeta nextOldMeta = array[updateIndex + 1];
+
+ assert newMeta.lastLogIndexExclusive() ==
nextOldMeta.firstLogIndexInclusive() :
+ String.format("Index File Metas must be contiguous.
Expected log index: %d, actual log index: %d",
+ nextOldMeta.firstLogIndexInclusive(),
+ newMeta.lastLogIndexExclusive()
+ );
+ }
+
+ IndexFileMeta[] newArray = array.clone();
+
+ newArray[updateIndex] = newMeta;
+
+ return new IndexFileMetaArray(newArray, size);
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(array);
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
new file mode 100644
index 00000000000..f4fc31cf654
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.lang.Math.toIntExact;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
+import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Garbage Collector for Raft log segment files.
+ *
+ * <p>The garbage collector performs compaction of segment files by removing
truncated log entries and creating new generations
+ * of segment files. This process reclaims disk space occupied by log entries
that have been truncated via
+ * {@link LogStorage#truncatePrefix} or {@link LogStorage#truncateSuffix}
operations.
+ *
+ * <h2>Compaction Process</h2>
+ * When a segment file is selected for compaction, the GC:
+ * <ol>
+ * <li>Copies non-truncated entries to a new segment file with an
incremented generation number</li>
+ * <li>Creates a new index file for the new generation</li>
+ * <li>Atomically replaces the old segment file with the new one</li>
+ * <li>Deletes the old segment file and its index file</li>
+ * </ol>
+ */
+class RaftLogGarbageCollector {
+ private static final IgniteLogger LOG =
Loggers.forClass(RaftLogGarbageCollector.class);
+
+ private static final String TMP_FILE_SUFFIX = ".tmp";
+
+ private final Path segmentFilesDir;
+
+ private final IndexFileManager indexFileManager;
+
+ private final GroupInfoProvider groupInfoProvider;
+
+ private final AtomicLong logSize = new AtomicLong();
+
+ RaftLogGarbageCollector(
+ Path segmentFilesDir,
+ IndexFileManager indexFileManager,
+ GroupInfoProvider groupInfoProvider
+ ) {
+ this.segmentFilesDir = segmentFilesDir;
+ this.indexFileManager = indexFileManager;
+ this.groupInfoProvider = groupInfoProvider;
+ }
+
+ void cleanupLeftoverFiles() throws IOException {
+ FileProperties prevFileProperties = null;
+
+ try (Stream<Path> segmentFiles = Files.list(segmentFilesDir)) {
+ Iterator<Path> it = segmentFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path segmentFile = it.next();
+
+ if
(segmentFile.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) {
+ LOG.info("Deleting temporary segment file [path = {}].",
segmentFile);
+
+ Files.delete(segmentFile);
+ } else {
+ FileProperties fileProperties =
SegmentFile.fileProperties(segmentFile);
+
+ if (prevFileProperties != null &&
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+ Path prevPath =
segmentFilesDir.resolve(SegmentFile.fileName(prevFileProperties));
+
+ LOG.info("Deleting segment file because it has a
higher generation version [path = {}].", prevPath);
+
+ Files.delete(prevPath);
+ }
+
+ prevFileProperties = fileProperties;
+ }
+ }
+ }
+
+ // Do the same routine but for index files.
+ prevFileProperties = null;
+
+ try (Stream<Path> indexFiles =
Files.list(indexFileManager.indexFilesDir())) {
+ Iterator<Path> it = indexFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path indexFile = it.next();
+
+ FileProperties fileProperties = indexFileProperties(indexFile);
+
+ // The GC does not create temporary index files, they are
created by the index manager and are cleaned up by it.
+ if
(!Files.exists(segmentFilesDir.resolve(SegmentFile.fileName(fileProperties)))) {
+ LOG.info("Deleting index file because the corresponding
segment file does not exist [path = {}].", indexFile);
+
+ Files.delete(indexFile);
+ } else if (prevFileProperties != null &&
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+ Path prevPath =
indexFileManager.indexFilePath(prevFileProperties);
+
+ LOG.info("Deleting index file because it has a higher
generation version [path = {}].", prevPath);
+
+ Files.deleteIfExists(prevPath);
+ }
+
+ prevFileProperties = fileProperties;
+ }
+ }
+ }
+
+ // TODO: Optimize compaction of completely truncated files, see
https://issues.apache.org/jira/browse/IGNITE-27964.
+ @VisibleForTesting
+ void compactSegmentFile(SegmentFile segmentFile) throws IOException {
+ LOG.info("Compacting segment file [path = {}].", segmentFile.path());
+
+ // Cache for avoiding excessive min/max log index computations.
+ var logStorageInfos = new Long2ObjectOpenHashMap<GroupInfo>();
+
+ ByteBuffer buffer = segmentFile.buffer();
+
+ validateSegmentFileHeader(buffer, segmentFile.path());
+
+ TmpSegmentFile tmpSegmentFile = null;
+
+ WriteModeIndexMemTable tmpMemTable = null;
+
+ try {
+ while (!endOfSegmentReached(buffer)) {
+ int originalStartOfRecordOffset = buffer.position();
+
+ long groupId = buffer.getLong();
+
+ int payloadLength = buffer.getInt();
+
+ if (payloadLength <= 0) {
+ // Skip special entries (such as truncation records). They
can always be omitted.
+ // To identify such entries we rely on the fact that they
have nonpositive length field value.
+ int endOfRecordOffset = buffer.position() + Long.BYTES +
CRC_SIZE_BYTES;
+
+ buffer.position(endOfRecordOffset);
+
+ continue;
+ }
+
+ int endOfRecordOffset = buffer.position() + payloadLength +
CRC_SIZE_BYTES;
+
+ long index = VarlenEncoder.readLong(buffer);
+
+ GroupInfo info = logStorageInfos.computeIfAbsent(groupId,
groupInfoProvider::groupInfo);
+
+ if (info == null || index < info.firstLogIndexInclusive() ||
index >= info.lastLogIndexExclusive()) {
+ // We found a truncated entry, it should be skipped.
+ buffer.position(endOfRecordOffset);
+
+ continue;
+ }
+
+ if (tmpSegmentFile == null) {
+ tmpSegmentFile = new TmpSegmentFile(segmentFile);
+
+ tmpSegmentFile.writeHeader();
+
+ tmpMemTable = new SingleThreadMemTable();
+ }
+
+ int oldLimit = buffer.limit();
+
+ // Set the buffer boundaries to only write the current record
to the new file.
+
buffer.position(originalStartOfRecordOffset).limit(endOfRecordOffset);
+
+ @SuppressWarnings("resource")
+ long newStartOfRecordOffset =
tmpSegmentFile.fileChannel().position();
+
+ writeFully(tmpSegmentFile.fileChannel(), buffer);
+
+ buffer.limit(oldLimit);
+
+ tmpMemTable.appendSegmentFileOffset(groupId, index,
toIntExact(newStartOfRecordOffset));
+ }
+
+ Path indexFilePath =
indexFileManager.indexFilePath(segmentFile.fileProperties());
+
+ long logSizeDelta;
+
+ if (tmpSegmentFile != null) {
+ tmpSegmentFile.syncAndRename();
+
+ // Create a new index file and update the in-memory state to
point to it.
+ Path newIndexFilePath = indexFileManager.onIndexFileCompacted(
+ tmpMemTable.transitionToReadMode(),
+ segmentFile.fileProperties(),
+ tmpSegmentFile.fileProperties()
+ );
+
+ logSizeDelta = Files.size(segmentFile.path())
+ + Files.size(indexFilePath)
+ - tmpSegmentFile.size()
+ - Files.size(newIndexFilePath);
+ } else {
+ // We got lucky and the whole file can be removed.
+ logSizeDelta = Files.size(segmentFile.path()) +
Files.size(indexFilePath);
+ }
+
+ // Remove the previous generation of the segment file and its
index. This is safe to do, because we rely on the file system
+ // guarantees that other threads reading from the segment file
will still be able to do that even if the file is deleted.
+ Files.delete(segmentFile.path());
+ Files.delete(indexFilePath);
+
+ long newLogSize = logSize.addAndGet(-logSizeDelta);
+
+ if (LOG.isInfoEnabled()) {
+ if (tmpSegmentFile == null) {
+ LOG.info(
+ "Segment file removed (all entries are truncated)
[path = {}, log size freed = {}, new log size = {}].",
+ segmentFile.path(), logSizeDelta, newLogSize
+ );
+ } else {
+ LOG.info(
+ "Segment file compacted [path = {}, log size freed
= {}, new log size = {}].",
+ segmentFile.path(), logSizeDelta, newLogSize
+ );
+ }
+ }
+ } finally {
+ if (tmpSegmentFile != null) {
+ tmpSegmentFile.close();
+ }
+ }
+ }
+
+ private static void writeFully(WritableByteChannel channel, ByteBuffer
buffer) throws IOException {
+ while (buffer.hasRemaining()) {
+ channel.write(buffer);
+ }
+ }
+
+ private class TmpSegmentFile implements ManuallyCloseable {
+ private final String fileName;
+
+ private final Path tmpFilePath;
+
+ private final FileChannel fileChannel;
+
+ private final FileProperties fileProperties;
+
+ TmpSegmentFile(SegmentFile originalFile) throws IOException {
+ FileProperties originalFileProperties =
originalFile.fileProperties();
+
+ this.fileProperties = new
FileProperties(originalFileProperties.ordinal(),
originalFileProperties.generation() + 1);
+ this.fileName = SegmentFile.fileName(fileProperties);
+ this.tmpFilePath = segmentFilesDir.resolve(fileName +
TMP_FILE_SUFFIX);
+ this.fileChannel = FileChannel.open(tmpFilePath, CREATE_NEW,
WRITE);
+ }
+
+ void writeHeader() throws IOException {
+ fileChannel.write(ByteBuffer.wrap(HEADER_RECORD));
+ }
+
+ FileChannel fileChannel() {
+ return fileChannel;
+ }
+
+ void syncAndRename() throws IOException {
+ fileChannel.force(true);
+
+ atomicMoveFile(tmpFilePath, tmpFilePath.resolveSibling(fileName),
LOG);
+ }
+
+ long size() throws IOException {
+ return fileChannel.size();
+ }
+
+ FileProperties fileProperties() {
+ return fileProperties;
+ }
+
+ @Override
+ public void close() throws IOException {
+ fileChannel.close();
+ }
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
index 86ed55fac0b..d008ddb4ac0 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
@@ -77,9 +80,10 @@ class SegmentFile implements ManuallyCloseable {
/** Lock used to atomically execute fsync. */
private final Object syncLock = new Object();
- private SegmentFile(RandomAccessFile file, Path path, boolean isSync)
throws IOException {
- //noinspection ChannelOpenedButNotSafelyClosed
- buffer = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
+ private SegmentFile(FileChannel channel, Path path, boolean isSync) throws
IOException {
+ buffer = channel.map(MapMode.READ_WRITE, 0, channel.size());
+
+ assert buffer.limit() > 0 : "File " + path + " is empty.";
this.path = path;
this.isSync = isSync;
@@ -97,20 +101,17 @@ class SegmentFile implements ManuallyCloseable {
throw new IllegalArgumentException("File size is too big: " +
fileSize);
}
+ // Using the RandomAccessFile for its "setLength" method.
try (var file = new RandomAccessFile(path.toFile(), "rw")) {
file.setLength(fileSize);
- return new SegmentFile(file, path, isSync);
+ return new SegmentFile(file.getChannel(), path, isSync);
}
}
static SegmentFile openExisting(Path path, boolean isSync) throws
IOException {
- if (!Files.exists(path)) {
- throw new IllegalArgumentException("File does not exist: " + path);
- }
-
- try (var file = new RandomAccessFile(path.toFile(), "rw")) {
- return new SegmentFile(file, path, isSync);
+ try (var channel = FileChannel.open(path, READ, WRITE)) {
+ return new SegmentFile(channel, path, isSync);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 9f50c7598d0..cf1195a1621 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static java.lang.Math.toIntExact;
import static
org.apache.ignite.internal.raft.configuration.LogStorageConfigurationSchema.UNSPECIFIED_MAX_LOG_ENTRY_SIZE;
import static
org.apache.ignite.internal.raft.configuration.LogStorageConfigurationSchema.computeDefaultMaxLogEntrySizeBytes;
-import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFile.fileName;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.RESET_RECORD_SIZE;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.TRUNCATE_PREFIX_RECORD_SIZE;
@@ -31,6 +30,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
@@ -129,6 +129,8 @@ class SegmentFileManager implements ManuallyCloseable {
private final IndexFileManager indexFileManager;
+ private final RaftLogGarbageCollector garbageCollector;
+
/** Configured size of a segment file. */
private final int segmentFileSize;
@@ -157,6 +159,7 @@ class SegmentFileManager implements ManuallyCloseable {
Path baseDir,
int stripes,
FailureProcessor failureProcessor,
+ GroupInfoProvider groupInfoProvider,
RaftConfiguration raftConfiguration,
LogStorageConfiguration storageConfiguration
) throws IOException {
@@ -180,14 +183,15 @@ class SegmentFileManager implements ManuallyCloseable {
failureProcessor,
logStorageView.maxCheckpointQueueSize()
);
+
+ garbageCollector = new RaftLogGarbageCollector(segmentFilesDir,
indexFileManager, groupInfoProvider);
}
void start() throws IOException {
LOG.info("Starting segment file manager [segmentFilesDir={},
fileSize={}].", segmentFilesDir, segmentFileSize);
- indexFileManager.cleanupTmpFiles();
-
- var payloadParser = new SegmentPayloadParser();
+ indexFileManager.cleanupLeftoverFiles();
+ garbageCollector.cleanupLeftoverFiles();
Path lastSegmentFilePath = null;
@@ -204,10 +208,11 @@ class SegmentFileManager implements ManuallyCloseable {
// Create missing index files.
FileProperties segmentFileProperties =
SegmentFile.fileProperties(segmentFilePath);
- if
(!indexFileManager.indexFileExists(segmentFileProperties)) {
+ // TODO: we may want to load the file list into memory,
see https://issues.apache.org/jira/browse/IGNITE-27961
+ if
(!Files.exists(indexFileManager.indexFilePath(segmentFileProperties))) {
LOG.info("Creating missing index file for segment file
{}.", segmentFilePath);
- SegmentFileWithMemtable segmentFileWithMemtable =
recoverSegmentFile(segmentFilePath, payloadParser);
+ SegmentFileWithMemtable segmentFileWithMemtable =
recoverSegmentFile(segmentFilePath);
indexFileManager.recoverIndexFile(segmentFileWithMemtable.memtable().transitionToReadMode(),
segmentFileProperties);
}
@@ -220,7 +225,7 @@ class SegmentFileManager implements ManuallyCloseable {
} else {
curSegmentFileOrdinal =
SegmentFile.fileProperties(lastSegmentFilePath).ordinal();
-
currentSegmentFile.set(recoverLatestSegmentFile(lastSegmentFilePath,
payloadParser));
+
currentSegmentFile.set(recoverLatestSegmentFile(lastSegmentFilePath));
}
LOG.info("Segment file manager recovery completed. Current segment
file: {}.", lastSegmentFilePath);
@@ -244,8 +249,13 @@ class SegmentFileManager implements ManuallyCloseable {
return indexFileManager;
}
+ @TestOnly
+ RaftLogGarbageCollector garbageCollector() {
+ return garbageCollector;
+ }
+
private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal)
throws IOException {
- Path path = segmentFilesDir.resolve(fileName(new
FileProperties(fileOrdinal)));
+ Path path = segmentFilesDir.resolve(SegmentFile.fileName(new
FileProperties(fileOrdinal)));
SegmentFile segmentFile = SegmentFile.createNew(path, segmentFileSize,
isSync);
@@ -259,12 +269,12 @@ class SegmentFileManager implements ManuallyCloseable {
* "complete" segment files (i.e. those that have experienced a rollover)
this method is expected to be called on the most recent,
* possibly incomplete segment file.
*/
- private SegmentFileWithMemtable recoverLatestSegmentFile(Path
segmentFilePath, SegmentPayloadParser payloadParser) throws IOException {
+ private SegmentFileWithMemtable recoverLatestSegmentFile(Path
segmentFilePath) throws IOException {
SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
isSync);
var memTable = new StripedMemTable(stripes);
- payloadParser.recoverMemtable(segmentFile, memTable, true);
+ SegmentPayloadParser.recoverMemtable(segmentFile, memTable, true);
return new SegmentFileWithMemtable(segmentFile, memTable, false);
}
@@ -276,12 +286,12 @@ class SegmentFileManager implements ManuallyCloseable {
* <p>This method skips CRC validation, because it is used to identify the
end of incomplete segment files (and, by definition, this can
* never happen during this method's invocation), not to validate storage
integrity.
*/
- private SegmentFileWithMemtable recoverSegmentFile(Path segmentFilePath,
SegmentPayloadParser payloadParser) throws IOException {
+ private SegmentFileWithMemtable recoverSegmentFile(Path segmentFilePath)
throws IOException {
SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
isSync);
var memTable = new SingleThreadMemTable();
- payloadParser.recoverMemtable(segmentFile, memTable, false);
+ SegmentPayloadParser.recoverMemtable(segmentFile, memTable, false);
return new SegmentFileWithMemtable(segmentFile, memTable, false);
}
@@ -541,20 +551,29 @@ class SegmentFileManager implements ManuallyCloseable {
}
private EntrySearchResult readFromOtherSegmentFiles(long groupId, long
logIndex) throws IOException {
- SegmentFilePointer segmentFilePointer =
indexFileManager.getSegmentFilePointer(groupId, logIndex);
+ while (true) {
+ SegmentFilePointer segmentFilePointer =
indexFileManager.getSegmentFilePointer(groupId, logIndex);
- if (segmentFilePointer == null) {
- return EntrySearchResult.notFound();
- }
+ if (segmentFilePointer == null) {
+ return EntrySearchResult.notFound();
+ }
- Path path =
segmentFilesDir.resolve(fileName(segmentFilePointer.fileProperties()));
+ Path path =
segmentFilesDir.resolve(SegmentFile.fileName(segmentFilePointer.fileProperties()));
- // TODO: Add a cache for recently accessed segment files, see
https://issues.apache.org/jira/browse/IGNITE-26622.
- SegmentFile segmentFile = SegmentFile.openExisting(path, isSync);
+ // TODO: Add a cache for recently accessed segment files, see
https://issues.apache.org/jira/browse/IGNITE-26622.
+ try {
+ SegmentFile segmentFile = SegmentFile.openExisting(path,
isSync);
- ByteBuffer buffer =
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
+ ByteBuffer buffer =
segmentFile.buffer().position(segmentFilePointer.payloadOffset());
- return EntrySearchResult.success(buffer);
+ return EntrySearchResult.success(buffer);
+ } catch (NoSuchFileException e) {
+ // When reading from a segment file based on information from
the index manager, there exists a race with the Garbage
+ // Collector: index manager can return a pointer to a segment
file that may have been compacted. In this case, we should
+ // just retry and get more recent information.
+ LOG.info("Segment file {} not found, retrying.", path);
+ }
+ }
}
private static int maxLogEntrySize(LogStorageView storageConfiguration) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
index 76e9b7477c6..e34bae493b8 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.raft.util.VarlenEncoder;
import org.apache.ignite.internal.util.FastCrc;
class SegmentPayloadParser {
- void recoverMemtable(SegmentFile segmentFile, WriteModeIndexMemTable
memtable, boolean validateCrc) {
+ static void recoverMemtable(SegmentFile segmentFile,
WriteModeIndexMemTable memtable, boolean validateCrc) {
ByteBuffer buffer = segmentFile.buffer();
validateSegmentFileHeader(buffer, segmentFile.path());
@@ -104,7 +104,7 @@ class SegmentPayloadParser {
}
}
- private static void validateSegmentFileHeader(ByteBuffer buffer, Path
segmentFilePath) {
+ static void validateSegmentFileHeader(ByteBuffer buffer, Path
segmentFilePath) {
int magicNumber = buffer.getInt();
if (magicNumber != MAGIC_NUMBER) {
@@ -132,7 +132,7 @@ class SegmentPayloadParser {
return crc == expectedCrc;
}
- private static boolean endOfSegmentReached(ByteBuffer buffer) {
+ static boolean endOfSegmentReached(ByteBuffer buffer) {
if (buffer.remaining() < SWITCH_SEGMENT_RECORD.length) {
return true;
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
index 797e545d2a4..235e2e2063b 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import org.apache.ignite.internal.lang.RunnableX;
@@ -277,4 +280,80 @@ class GroupIndexMetaTest extends BaseIgniteAbstractTest {
assertThat(groupMeta.indexMeta(19), is(nullValue()));
assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
}
+
+ @Test
+ void testOnIndexCompacted() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ var compactedMeta2 = new IndexFileMeta(50, 100, 42, new
FileProperties(1, 1));
+ groupMeta.onIndexCompacted(new FileProperties(1), compactedMeta2);
+
+ assertThat(groupMeta.indexMeta(1), is(meta1));
+ assertThat(groupMeta.indexMeta(50), is(compactedMeta2));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+ }
+
+ @Test
+ void testOnIndexCompactedWithMultipleBlocks() {
+ // meta1 is in block 0.
+ var meta1 = new IndexFileMeta(1, 100, 0, new FileProperties(0));
+ var groupMeta = new GroupIndexMeta(meta1);
+
+ // meta2 overlaps meta1, creating a second block in the deque.
+ var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(1));
+ groupMeta.addIndexMeta(meta2);
+
+ // meta3 is added to the second block (consecutive to meta2).
+ var meta3 = new IndexFileMeta(100, 200, 84, new FileProperties(2));
+ groupMeta.addIndexMeta(meta3);
+
+ // Compact meta1 from the older block.
+ var compactedMeta1 = new IndexFileMeta(1, 100, 0, new
FileProperties(0, 1));
+ groupMeta.onIndexCompacted(new FileProperties(0), compactedMeta1);
+
+ assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
+ assertThat(groupMeta.indexMeta(42), is(meta2));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+
+ // Compact meta3 from the newer block.
+ var compactedMeta3 = new IndexFileMeta(100, 200, 84, new
FileProperties(2, 1));
+ groupMeta.onIndexCompacted(new FileProperties(2), compactedMeta3);
+
+ assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
+ assertThat(groupMeta.indexMeta(42), is(meta2));
+ assertThat(groupMeta.indexMeta(100), is(compactedMeta3));
+ }
+
+ @RepeatedTest(100)
+ void multithreadCompactionWithTruncatePrefix() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var compactedMeta2 = new IndexFileMeta(42, 100, 42, new
FileProperties(1, 1));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ RunnableX compactionTask = () -> groupMeta.onIndexCompacted(new
FileProperties(1), compactedMeta2);
+
+ RunnableX truncateTask = () -> groupMeta.truncatePrefix(43);
+
+ RunnableX readTask = () -> {
+ IndexFileMeta indexFileMeta = groupMeta.indexMeta(51);
+
+ assertThat(indexFileMeta, is(notNullValue()));
+ assertThat(indexFileMeta.firstLogIndexInclusive(),
is(anyOf(equalTo(42L), equalTo(43L))));
+ assertThat(indexFileMeta.indexFileProperties().generation(),
is(anyOf(equalTo(0), equalTo(1))));
+ };
+
+ runRace(compactionTask, truncateTask, readTask);
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index d46dc5c7c39..6da3b7067a8 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -23,11 +23,13 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class IndexFileManagerTest extends IgniteAbstractTest {
@@ -379,8 +381,8 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testExists() throws IOException {
- assertThat(indexFileManager.indexFileExists(new FileProperties(0)),
is(false));
- assertThat(indexFileManager.indexFileExists(new FileProperties(1)),
is(false));
+ assertThat(Files.exists(indexFileManager.indexFilePath(new
FileProperties(0))), is(false));
+ assertThat(Files.exists(indexFileManager.indexFilePath(new
FileProperties(1))), is(false));
var memtable = new StripedMemTable(STRIPES);
@@ -388,13 +390,13 @@ class IndexFileManagerTest extends IgniteAbstractTest {
indexFileManager.saveNewIndexMemtable(memtable);
- assertThat(indexFileManager.indexFileExists(new FileProperties(0)),
is(true));
- assertThat(indexFileManager.indexFileExists(new FileProperties(1)),
is(false));
+ assertThat(Files.exists(indexFileManager.indexFilePath(new
FileProperties(0))), is(true));
+ assertThat(Files.exists(indexFileManager.indexFilePath(new
FileProperties(1))), is(false));
indexFileManager.saveNewIndexMemtable(memtable);
- assertThat(indexFileManager.indexFileExists(new FileProperties(0)),
is(true));
- assertThat(indexFileManager.indexFileExists(new FileProperties(1)),
is(true));
+ assertThat(Files.exists(indexFileManager.indexFilePath(new
FileProperties(0))), is(true));
+ assertThat(Files.exists(indexFileManager.indexFilePath(new
FileProperties(1))), is(true));
}
@Test
@@ -556,4 +558,56 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.firstLogIndexInclusive(0), is(2L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(3L));
}
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27980")
+ @Test
+ void testCompactionWithMissingGroups() throws IOException {
+ var memtable = new SingleThreadMemTable();
+
+ // First file contains entries from two groups.
+ memtable.appendSegmentFileOffset(0, 1, 1);
+ memtable.appendSegmentFileOffset(1, 1, 1);
+
+ indexFileManager.saveNewIndexMemtable(memtable);
+
+ memtable = new SingleThreadMemTable();
+
+ // Second file contains entries only from the first group.
+ memtable.appendSegmentFileOffset(0, 2, 2);
+
+ indexFileManager.saveNewIndexMemtable(memtable);
+
+ memtable = new SingleThreadMemTable();
+
+ // Third file contains entries from two groups again.
+ memtable.appendSegmentFileOffset(0, 3, 3);
+ memtable.appendSegmentFileOffset(0, 4, 4);
+ memtable.appendSegmentFileOffset(1, 2, 2);
+
+ indexFileManager.saveNewIndexMemtable(memtable);
+
+ // Truncate prefix of the group 0 so that one of the entries from the
third file are removed.
+ memtable = new SingleThreadMemTable();
+
+ memtable.truncatePrefix(0, 4);
+
+ indexFileManager.saveNewIndexMemtable(memtable);
+
+ var compactedMemtable = new SingleThreadMemTable();
+
+ // We are removing an entry for group 0 from the third file.
+ compactedMemtable.appendSegmentFileOffset(1, 2, 2);
+
+ indexFileManager.onIndexFileCompacted(compactedMemtable, new
FileProperties(2, 0), new FileProperties(2, 1));
+
+ assertThat(
+ indexFileManager.getSegmentFilePointer(0, 3),
+ is(nullValue())
+ );
+
+ assertThat(
+ indexFileManager.getSegmentFilePointer(1, 2),
+ is(new SegmentFilePointer(new FileProperties(2, 1), 2))
+ );
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
new file mode 100644
index 00000000000..69b6e0c9f95
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link RaftLogGarbageCollector}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(MockitoExtension.class)
+class RaftLogGarbageCollectorTest extends IgniteAbstractTest {
+ private static final int FILE_SIZE = 200;
+
+ private static final long GROUP_ID_1 = 1000;
+
+ private static final long GROUP_ID_2 = 2000;
+
+ private static final int STRIPES = 10;
+
+ private static final String NODE_NAME = "test";
+
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
+ @InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
+ private LogStorageConfiguration storageConfiguration;
+
+ private SegmentFileManager fileManager;
+
+ private IndexFileManager indexFileManager;
+
+ private RaftLogGarbageCollector garbageCollector;
+
+ @Mock
+ private GroupInfoProvider groupInfoProvider;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ fileManager = new SegmentFileManager(
+ NODE_NAME,
+ workDir,
+ STRIPES,
+ new NoOpFailureManager(),
+ groupInfoProvider,
+ raftConfiguration,
+ storageConfiguration
+ );
+
+ fileManager.start();
+
+ indexFileManager = fileManager.indexFileManager();
+
+ garbageCollector = fileManager.garbageCollector();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (fileManager != null) {
+ fileManager.close();
+ }
+ }
+
+ @Test
+ void testCompactSegmentFileWithAllEntriesTruncated() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+ // This is equivalent to prefix truncation up to the latest index.
+ when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+
+ List<Path> segmentFiles = segmentFiles();
+
+ Path segmentFilePath = segmentFiles.get(0);
+
+ FileProperties fileProperties =
SegmentFile.fileProperties(segmentFilePath);
+
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ assertThat(Files.exists(segmentFilePath), is(false));
+
assertThat(Files.exists(indexFileManager.indexFilePath(fileProperties)),
is(false));
+
+ // Validate that no files with increased generation have been created.
+ var newFileProperties = new FileProperties(fileProperties.ordinal(),
fileProperties.generation() + 1);
+
+ assertThat(segmentFiles(), hasSize(segmentFiles.size() - 1));
+
assertThat(Files.exists(fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties))),
is(false));
+
assertThat(Files.exists(indexFileManager.indexFilePath(newFileProperties)),
is(false));
+ }
+
+ @Test
+ void testCompactSegmentFileWithSomeEntriesTruncated() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 8, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ appendBytes(GROUP_ID_2, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+ List<Path> segmentFiles = segmentFiles();
+
+ long originalSize = Files.size(segmentFiles.get(0));
+
+ // Emulate truncation of one group
+ when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+ when(groupInfoProvider.groupInfo(GROUP_ID_2)).thenReturn(new
GroupInfo(1, batches.size() - 1));
+
+ Path firstSegmentFile = segmentFiles.get(0);
+
+ FileProperties originalFileProperties =
SegmentFile.fileProperties(firstSegmentFile);
+
+ SegmentFile segmentFile = SegmentFile.openExisting(firstSegmentFile,
false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ assertThat(Files.exists(firstSegmentFile), is(false));
+
+ var newFileProperties = new
FileProperties(originalFileProperties.ordinal(),
originalFileProperties.generation() + 1);
+
+ Path newSegmentFile =
fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties));
+
+ assertThat(Files.exists(newSegmentFile), is(true));
+
+ assertThat(Files.size(newSegmentFile), lessThan(originalSize));
+
+
assertThat(Files.exists(indexFileManager.indexFilePath(newFileProperties)),
is(true));
+
assertThat(Files.exists(indexFileManager.indexFilePath(originalFileProperties)),
is(false));
+ }
+
+ @Test
+ void testCompactSegmentFileWithTruncationRecords() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 5);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ fileManager.truncatePrefix(GROUP_ID_1, 2);
+ fileManager.truncateSuffix(GROUP_ID_1, 3);
+
+ await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+ List<Path> segmentFiles = segmentFiles();
+ assertThat(segmentFiles, hasSize(greaterThan(1)));
+
+ // This is equivalent to prefix truncation up to the latest index.
+ when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+
+ Path firstSegmentFile = segmentFiles.get(0);
+
+ SegmentFile segmentFile = SegmentFile.openExisting(firstSegmentFile,
false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ assertThat(Files.exists(firstSegmentFile), is(false));
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27964")
+ @RepeatedTest(10)
+ void testConcurrentCompactionAndReads() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 10, 50);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size()
- 1)));
+
+ var firstAliveIndex = new AtomicLong();
+ var gcTaskDone = new AtomicBoolean(false);
+
+ when(groupInfoProvider.groupInfo(GROUP_ID_1))
+ .thenAnswer(invocationOnMock -> new
GroupInfo(firstAliveIndex.get(), batches.size() - 1));
+
+ RunnableX gcTask = () -> {
+ try {
+ List<Path> segmentFiles = segmentFiles();
+
+ Path lastSegmentFile = segmentFiles.get(segmentFiles.size() -
1);
+
+ // Don't compact the last segment file.
+ while (!segmentFiles.get(0).equals(lastSegmentFile)) {
+ long index = firstAliveIndex.incrementAndGet();
+
+ fileManager.truncatePrefix(GROUP_ID_1, index);
+
+ SegmentFile segmentFile =
SegmentFile.openExisting(segmentFiles.get(0), false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ segmentFiles = segmentFiles();
+ }
+ } finally {
+ gcTaskDone.set(true);
+ }
+ };
+
+ RunnableX readTask = () -> {
+ while (!gcTaskDone.get()) {
+ for (int i = 0; i < batches.size(); i++) {
+ int index = i;
+
+ fileManager.getEntry(GROUP_ID_1, i, bs -> {
+ if (bs != null) {
+ assertThat(bs, is(batches.get(index)));
+ }
+ return null;
+ });
+ }
+ }
+ };
+
+ runRace(gcTask, readTask, readTask, readTask);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27964")
+ @RepeatedTest(10)
+ void testConcurrentCompactionAndReadsFromMultipleGroups() throws Exception
{
+ List<byte[]> batches = createRandomData(FILE_SIZE / 10, 50);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ appendBytes(GROUP_ID_2, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(equalTo(segmentFiles().size()
- 1)));
+
+ var firstAliveIndex = new AtomicLong();
+ var gcTaskDone = new AtomicBoolean(false);
+
+ when(groupInfoProvider.groupInfo(GROUP_ID_1))
+ .thenAnswer(invocationOnMock -> new
GroupInfo(firstAliveIndex.get(), batches.size() - 1));
+
+ // Group 2 is never compacted.
+ when(groupInfoProvider.groupInfo(GROUP_ID_2))
+ .thenAnswer(invocationOnMock -> new GroupInfo(0,
batches.size() - 1));
+
+ RunnableX gcTask = () -> {
+ try {
+ List<Path> segmentFiles = segmentFiles();
+
+ Path curSegmentFilePath = segmentFiles.get(0);
+
+ // Unlike in the similar test, segment files will never be
removed completely due to the presence of the second group.
+ // Because of that we need to use more complex logic to
iterate over the segment files.
+ int segmentFilesIndex = 0;
+
+ while (true) {
+ long index = firstAliveIndex.incrementAndGet();
+
+ fileManager.truncatePrefix(GROUP_ID_1, index);
+
+ FileProperties fileProperties =
SegmentFile.fileProperties(curSegmentFilePath);
+
+ long sizeBeforeCompaction = Files.size(curSegmentFilePath);
+
+ SegmentFile segmentFile =
SegmentFile.openExisting(curSegmentFilePath, false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ FileProperties newFileProperties = new
FileProperties(fileProperties.ordinal(), fileProperties.generation() + 1);
+
+ curSegmentFilePath =
fileManager.segmentFilesDir().resolve(SegmentFile.fileName(newFileProperties));
+
+ long sizeAfterCompaction = Files.size(curSegmentFilePath);
+
+ // If the files' size didn't change, there's nothing left
to compact, we can switch to the next segment.
+ if (sizeAfterCompaction == sizeBeforeCompaction) {
+ segmentFilesIndex++;
+
+ // Don't compact the last segment file.
+ if (segmentFilesIndex == segmentFiles.size() - 1) {
+ break;
+ }
+
+ curSegmentFilePath =
segmentFiles.get(segmentFilesIndex);
+ }
+ }
+ } finally {
+ gcTaskDone.set(true);
+ }
+ };
+
+ RunnableX readTaskFromGroup1 = () -> {
+ while (!gcTaskDone.get()) {
+ for (int i = 0; i < batches.size(); i++) {
+ int index = i;
+
+ fileManager.getEntry(GROUP_ID_1, i, bs -> {
+ if (bs != null) {
+ assertThat(bs, is(batches.get(index)));
+ }
+ return null;
+ });
+ }
+ }
+ };
+
+ RunnableX readTaskFromGroup2 = () -> {
+ while (!gcTaskDone.get()) {
+ for (int i = 0; i < batches.size(); i++) {
+ int index = i;
+
+ fileManager.getEntry(GROUP_ID_2, i, bs -> {
+ if (bs != null) {
+ assertThat(bs, is(batches.get(index)));
+ }
+ return null;
+ });
+ }
+ }
+ };
+
+ runRace(gcTask, readTaskFromGroup1, readTaskFromGroup1,
readTaskFromGroup2, readTaskFromGroup2);
+ }
+
+ @Test
+ void testRecoveryAfterCompaction() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 8, 10);
+
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ appendBytes(GROUP_ID_2, batches.get(i), i);
+ }
+
+ fileManager.truncatePrefix(GROUP_ID_1, batches.size() - 1);
+
+ await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+ List<Path> segmentFiles = segmentFiles();
+
+ // Emulate truncation of one group
+ when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+ when(groupInfoProvider.groupInfo(GROUP_ID_2)).thenReturn(new
GroupInfo(1, batches.size() - 1));
+
+ SegmentFile segmentFile =
SegmentFile.openExisting(segmentFiles.get(0), false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ fileManager.close();
+
+ fileManager = new SegmentFileManager(
+ NODE_NAME,
+ workDir,
+ STRIPES,
+ new NoOpFailureManager(),
+ groupInfoProvider,
+ raftConfiguration,
+ storageConfiguration
+ );
+
+ fileManager.start();
+
+ for (int i = 0; i < batches.size(); i++) {
+ int index = i;
+
+ fileManager.getEntry(GROUP_ID_1, i, bs -> {
+ assertThat(index, is(batches.size() - 1));
+ assertThat(bs, is(batches.get(index)));
+
+ return null;
+ });
+
+ fileManager.getEntry(GROUP_ID_2, i, bs -> {
+ assertThat(bs, is(batches.get(index)));
+ return null;
+ });
+ }
+ }
+
+ @Test
+ void testCleanupLeftoverFilesOnRecovery() throws Exception {
+ // Create temporary segment files
+ Path tmpFile1 =
fileManager.segmentFilesDir().resolve("segment-0000000001-0000000000.bin.tmp");
+ Path tmpFile2 =
fileManager.segmentFilesDir().resolve("segment-0000000002-0000000001.bin.tmp");
+
+ Files.createFile(tmpFile1);
+ Files.createFile(tmpFile2);
+
+ // Create duplicate segment and index files with same ordinal but
different generations
+ Path segmentFile1Gen0 =
fileManager.segmentFilesDir().resolve("segment-0000000003-0000000000.bin");
+ Path segmentFile1Gen1 =
fileManager.segmentFilesDir().resolve("segment-0000000003-0000000001.bin");
+ Path indexFile1Gen0 =
fileManager.indexFileManager().indexFilesDir().resolve("index-0000000003-0000000000.bin");
+ Path indexFile1Gen1 =
fileManager.indexFileManager().indexFilesDir().resolve("index-0000000003-0000000001.bin");
+
+ Files.createFile(segmentFile1Gen0);
+ Files.createFile(segmentFile1Gen1);
+ Files.createFile(indexFile1Gen0);
+ Files.createFile(indexFile1Gen1);
+
+ // Create orphaned index files (no corresponding segment file)
+ Path orphanedIndexFile =
fileManager.indexFileManager().indexFilesDir().resolve("index-0000000099-0000000000.bin");
+
+ Files.createFile(orphanedIndexFile);
+
+ fileManager.close();
+
+ fileManager = new SegmentFileManager(
+ NODE_NAME,
+ workDir,
+ STRIPES,
+ new NoOpFailureManager(),
+ groupInfoProvider,
+ raftConfiguration,
+ storageConfiguration
+ );
+
+ fileManager.garbageCollector().cleanupLeftoverFiles();
+
+ // Verify temporary files are cleaned up
+ assertFalse(Files.exists(tmpFile1));
+ assertFalse(Files.exists(tmpFile2));
+
+ // Verify duplicate segment files are cleaned up (lower generation
removed)
+ assertFalse(Files.exists(segmentFile1Gen0));
+ assertTrue(Files.exists(segmentFile1Gen1));
+ assertFalse(Files.exists(indexFile1Gen0));
+ assertTrue(Files.exists(indexFile1Gen1));
+
+ // Verify orphaned index files are cleaned up
+ assertFalse(Files.exists(orphanedIndexFile));
+ }
+
+ private List<Path> segmentFiles() throws IOException {
+ try (Stream<Path> files = Files.list(fileManager.segmentFilesDir())) {
+ return files
+ .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+ .sorted()
+ .collect(Collectors.toList());
+ }
+ }
+
+ private List<Path> indexFiles() throws IOException {
+ try (Stream<Path> files = Files.list(fileManager.indexFilesDir())) {
+ return files
+ .filter(p -> !p.getFileName().toString().endsWith(".tmp"))
+ .sorted()
+ .collect(Collectors.toList());
+ }
+ }
+
+ private static List<byte[]> createRandomData(int batchLength, int
numBatches) {
+ return IntStream.range(0, numBatches)
+ .mapToObj(i -> randomBytes(ThreadLocalRandom.current(),
batchLength))
+ .collect(Collectors.toList());
+ }
+
+ private void appendBytes(long groupId, byte[] serializedEntry, long index)
throws IOException {
+ var entry = new LogEntry();
+ entry.setId(new LogId(index, 0));
+
+ fileManager.appendEntry(groupId, entry, new LogEntryEncoder() {
+ @Override
+ public byte[] encode(LogEntry log) {
+ return serializedEntry;
+ }
+
+ @Override
+ public void encode(ByteBuffer buffer, LogEntry log) {
+ buffer.put(serializedEntry);
+ }
+
+ @Override
+ public int size(LogEntry logEntry) {
+ return serializedEntry.length;
+ }
+ });
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
index 3debc97b513..27b182a7124 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java
@@ -87,6 +87,7 @@ class SegmentFileManagerGetEntryTest extends
IgniteAbstractTest {
workDir,
STRIPES,
new NoOpFailureManager(),
+ groupId -> null,
raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 9bf3099fb7e..a11a96588db 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static java.util.Collections.emptyIterator;
import static java.util.Comparator.comparingLong;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
@@ -118,6 +119,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
workDir,
STRIPES,
failureManager,
+ GroupInfoProvider.NO_OP,
raftConfiguration,
storageConfiguration
);
@@ -489,7 +491,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
// Use a mock memtable that throws an exception to force the index
manager to create a temporary index file, but not rename it.
ReadModeIndexMemTable mockMemTable = mock(ReadModeIndexMemTable.class);
- when(mockMemTable.iterator()).thenThrow(new RuntimeException("Test
exception"));
+ when(mockMemTable.iterator())
+ .thenReturn(emptyIterator())
+ .thenThrow(new RuntimeException("Test exception"));
// Create a tmp file for the incomplete segment file.
try {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
index bd053f0468d..60cfafe065c 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileTest.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
@@ -112,7 +113,7 @@ class SegmentFileTest extends IgniteAbstractTest {
@Test
void testOpenExistingConstructorInvariants() throws IOException {
- assertThrows(IllegalArgumentException.class, () ->
SegmentFile.openExisting(path, false));
+ assertThrows(NoSuchFileException.class, () ->
SegmentFile.openExisting(path, false));
createSegmentFile(1);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
index 98053c13787..ca7ce4c605d 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageConcurrencyTest.java
@@ -61,6 +61,7 @@ class SegstoreLogStorageConcurrencyTest extends
IgniteAbstractTest {
workDir,
1,
new NoOpFailureManager(),
+ GroupInfoProvider.NO_OP,
raftConfiguration,
storageConfiguration
);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index c058d46078d..9009e122163 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -64,6 +64,7 @@ class SegstoreLogStorageTest extends BaseLogStorageTest {
path,
1,
new NoOpFailureManager(),
+ GroupInfoProvider.NO_OP,
raftConfiguration,
storageConfiguration
);