This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 226cb5e5d29 IGNITE-26281 Implement index file meta for the new log
storage (#6661)
226cb5e5d29 is described below
commit 226cb5e5d298c46614c9968bfc958b541e14750d
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Sep 30 21:35:53 2025 +0300
IGNITE-26281 Implement index file meta for the new log storage (#6661)
---
.../raft/storage/segstore/GroupIndexMeta.java | 81 +++++++++++++
.../internal/raft/storage/segstore/IndexFile.java | 64 -----------
.../raft/storage/segstore/IndexFileManager.java | 128 ++++++++++++++++++---
.../raft/storage/segstore/IndexFileMeta.java | 77 +++++++++++++
.../raft/storage/segstore/IndexFileMetaArray.java | 97 ++++++++++++++++
.../raft/storage/segstore/IndexFilePointer.java | 37 ++++++
.../raft/storage/segstore/RaftLogCheckpointer.java | 4 +-
.../raft/storage/segstore/SegmentFileManager.java | 14 +--
.../raft/storage/segstore/SegmentFilePointer.java | 37 ++++++
.../raft/storage/segstore/SegmentInfo.java | 10 +-
.../raft/storage/segstore/ByteChannelUtils.java | 39 +++++++
.../storage/segstore/DeserializedIndexFile.java | 43 +++----
.../segstore/DeserializedSegmentPayload.java | 87 +++++---------
.../raft/storage/segstore/GroupIndexMetaTest.java | 107 +++++++++++++++++
.../storage/segstore/IndexFileManagerTest.java | 103 +++++++++++------
.../storage/segstore/IndexFileMetaArrayTest.java | 93 +++++++++++++++
.../storage/segstore/RaftLogCheckpointerTest.java | 7 +-
.../storage/segstore/SegmentFileManagerTest.java | 42 ++++---
.../storage/segstore/SegstoreLogStorageTest.java | 17 +--
19 files changed, 841 insertions(+), 246 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
new file mode 100644
index 00000000000..66b1891f845
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents in-memory meta information about a particular Raft group stored
in an index file.
+ */
+class GroupIndexMeta {
+ private static final VarHandle FILE_METAS_VH;
+
+ static {
+ try {
+ FILE_METAS_VH =
MethodHandles.lookup().findVarHandle(GroupIndexMeta.class, "fileMetas",
IndexFileMetaArray.class);
+ } catch (ReflectiveOperationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Ordinal number of the first index file in the group.
+ */
+ private final int startFileOrdinal;
+
+ @SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
+ private volatile IndexFileMetaArray fileMetas;
+
+ GroupIndexMeta(int startFileOrdinal, IndexFileMeta startFileMeta) {
+ this.startFileOrdinal = startFileOrdinal;
+ this.fileMetas = new IndexFileMetaArray(startFileMeta);
+ }
+
+ void addIndexMeta(IndexFileMeta indexFileMeta) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+
+ // Simple assignment would suffice, since we only have one thread
writing to this field, but we use compareAndSet to verify
+ // this invariant, just in case.
+ boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas,
newFileMetas);
+
+ assert updated : "Concurrent writes detected";
+ }
+
+ /**
+ * Returns a file pointer that uniquely identifies the index file for the
given log index. Returns {@code null} if the given log index
+ * is not found in any of the index files in this group.
+ */
+ @Nullable
+ IndexFilePointer indexFilePointer(long logIndex) {
+ IndexFileMetaArray fileMetas = this.fileMetas;
+
+ int arrayIndex = fileMetas.find(logIndex);
+
+ if (arrayIndex < 0) {
+ return null;
+ }
+
+ IndexFileMeta meta = fileMetas.get(arrayIndex);
+
+ return new IndexFilePointer(startFileOrdinal + arrayIndex, meta);
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFile.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFile.java
deleted file mode 100644
index a3004c79d67..00000000000
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFile.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.raft.storage.segstore;
-
-import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
-import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-
-/**
- * Represents an index file create by an {@link IndexFileManager}.
- *
- * <p>Not thread-safe.
- */
-class IndexFile {
- private static final IgniteLogger LOG = Loggers.forClass(IndexFile.class);
-
- private final String name;
-
- private Path path;
-
- IndexFile(String name, Path path) {
- this.name = name;
- this.path = path;
- }
-
- void syncAndRename() throws IOException {
- fsyncFile(path);
-
- path = atomicMoveFile(path, path.getParent().resolve(name), LOG);
- }
-
- /** Returns the name of the index file. */
- String name() {
- return name;
- }
-
- /**
- * Returns the current path to the index file.
- *
- * <p>The file is originally created as a temporary file until it gets
renamed by calling {@link #syncAndRename}.
- */
- Path path() {
- return path;
- }
-}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
index 0a23acd12f9..79e2dd58889 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java
@@ -19,16 +19,25 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+import static org.apache.ignite.internal.util.IgniteUtils.fsyncFile;
import java.io.BufferedOutputStream;
+import java.io.EOFException;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.jetbrains.annotations.Nullable;
/**
* File manager responsible for persisting {@link ReadModeIndexMemTable}s to
index files.
@@ -68,10 +77,15 @@ import java.util.Map.Entry;
*
+-------------------------------------------------------------------------+-----+
* </pre>
*
+ * <p>Index File Manager is also responsible for maintaining an in-memory
cache of persisted index files' metadata for quicker index file
+ * lookup.
+ *
* @see ReadModeIndexMemTable
* @see SegmentFileManager
*/
class IndexFileManager {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexFileManager.class);
+
static final int MAGIC_NUMBER = 0x6BF0A76A;
static final int FORMAT_VERSION = 1;
@@ -89,11 +103,16 @@ class IndexFileManager {
private final Path baseDir;
/**
- * Current index file index (used to generate index file names).
+ * Current index file ordinal (used to generate index file names).
*
* <p>No synchronized access is needed because this field is only used by
the checkpoint thread.
*/
- private int curFileIndex = 0;
+ private int curFileOrdinal = 0;
+
+ /**
+ * Index file metadata grouped by Raft Group ID.
+ */
+ private final Map<Long, GroupIndexMeta> groupIndexMetas = new
ConcurrentHashMap<>();
IndexFileManager(Path baseDir) {
this.baseDir = baseDir;
@@ -101,16 +120,16 @@ class IndexFileManager {
/**
* Saves the given index memtable to a file.
- *
- * <p>The file is saved into a temporary location and is expected to be
later renamed using {@link IndexFile#syncAndRename}.
*/
- IndexFile saveIndexMemtable(ReadModeIndexMemTable indexMemTable) throws
IOException {
- String fileName = indexFileName(curFileIndex++, 0);
+ Path saveIndexMemtable(ReadModeIndexMemTable indexMemTable) throws
IOException {
+ String fileName = indexFileName(curFileOrdinal, 0);
- Path path = baseDir.resolve(fileName + ".tmp");
+ Path tmpFilePath = baseDir.resolve(fileName + ".tmp");
- try (OutputStream os = new
BufferedOutputStream(Files.newOutputStream(path, CREATE_NEW, WRITE))) {
- os.write(header(indexMemTable));
+ try (var os = new
BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
+ byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable);
+
+ os.write(headerBytes);
Iterator<Entry<Long, SegmentInfo>> it = indexMemTable.iterator();
@@ -119,10 +138,60 @@ class IndexFileManager {
}
}
- return new IndexFile(fileName, path);
+ curFileOrdinal++;
+
+ return syncAndRename(tmpFilePath,
tmpFilePath.resolveSibling(fileName));
}
- private static byte[] header(ReadModeIndexMemTable indexMemTable) {
+ /**
+ * Returns a pointer into a segment file that contains the entry for the
given group's index. Returns {@code null} if the given log
+ * index could not be found in any of the index files.
+ */
+ @Nullable
+ SegmentFilePointer getSegmentFilePointer(long groupId, long logIndex)
throws IOException {
+ GroupIndexMeta groupIndexMeta = groupIndexMetas.get(groupId);
+
+ if (groupIndexMeta == null) {
+ return null;
+ }
+
+ IndexFilePointer filePointer =
groupIndexMeta.indexFilePointer(logIndex);
+
+ if (filePointer == null) {
+ return null;
+ }
+
+ Path indexFile =
baseDir.resolve(indexFileName(filePointer.fileOrdinal(), 0));
+
+ IndexFileMeta fileMeta = filePointer.fileMeta();
+
+ // Index file payload is a 0-based array, which indices correspond to
the [fileMeta.firstLogIndex, fileMeta.lastLogIndex] range.
+ long payloadArrayIndex = logIndex - fileMeta.firstLogIndex();
+
+ assert payloadArrayIndex >= 0 : payloadArrayIndex;
+
+ long payloadOffset = fileMeta.indexFilePayloadOffset() +
payloadArrayIndex * Integer.BYTES;
+
+ try (SeekableByteChannel channel = Files.newByteChannel(indexFile,
StandardOpenOption.READ)) {
+ channel.position(payloadOffset);
+
+ ByteBuffer segmentPayloadOffsetBuffer =
ByteBuffer.allocate(Integer.BYTES).order(BYTE_ORDER);
+
+ while (segmentPayloadOffsetBuffer.hasRemaining()) {
+ int bytesRead = channel.read(segmentPayloadOffsetBuffer);
+
+ if (bytesRead == -1) {
+ throw new EOFException("EOF reached while reading index
file: " + indexFile);
+ }
+ }
+
+ int segmentPayloadOffset = segmentPayloadOffsetBuffer.getInt(0);
+
+ return new SegmentFilePointer(filePointer.fileOrdinal(),
segmentPayloadOffset);
+ }
+ }
+
+ private byte[] serializeHeaderAndFillMetadata(ReadModeIndexMemTable
indexMemTable) {
int numGroups = indexMemTable.numGroups();
int headerSize = headerSize(numGroups);
@@ -140,16 +209,25 @@ class IndexFileManager {
while (it.hasNext()) {
Entry<Long, SegmentInfo> entry = it.next();
- long groupId = entry.getKey();
+ // Using the boxed value to avoid unnecessary autoboxing later.
+ Long groupId = entry.getKey();
SegmentInfo segmentInfo = entry.getValue();
+ long firstLogIndex = segmentInfo.firstLogIndex();
+
+ long lastLogIndex = segmentInfo.lastLogIndex();
+
+ var indexFileMeta = new IndexFileMeta(firstLogIndex, lastLogIndex,
payloadOffset);
+
+ putIndexFileMeta(groupId, indexFileMeta);
+
headerBuffer
.putLong(groupId)
.putInt(0) // Flags.
.putInt(payloadOffset)
- .putLong(segmentInfo.firstLogIndex())
- .putLong(segmentInfo.lastLogIndex());
+ .putLong(firstLogIndex)
+ .putLong(lastLogIndex);
payloadOffset += payloadSize(segmentInfo);
}
@@ -157,6 +235,22 @@ class IndexFileManager {
return headerBuffer.array();
}
+ private void putIndexFileMeta(Long groupId, IndexFileMeta indexFileMeta) {
+ GroupIndexMeta existingGroupIndexMeta = groupIndexMetas.get(groupId);
+
+ if (existingGroupIndexMeta == null) {
+ groupIndexMetas.put(groupId, new GroupIndexMeta(curFileOrdinal,
indexFileMeta));
+ } else {
+ existingGroupIndexMeta.addIndexMeta(indexFileMeta);
+ }
+ }
+
+ private static Path syncAndRename(Path from, Path to) throws IOException {
+ fsyncFile(from);
+
+ return atomicMoveFile(from, to, LOG);
+ }
+
private static byte[] payload(SegmentInfo segmentInfo) {
ByteBuffer payloadBuffer =
ByteBuffer.allocate(payloadSize(segmentInfo)).order(BYTE_ORDER);
@@ -173,7 +267,7 @@ class IndexFileManager {
return segmentInfo.size() * Integer.BYTES;
}
- private static String indexFileName(int fileIndex, int generation) {
- return String.format(INDEX_FILE_NAME_FORMAT, fileIndex, generation);
+ private static String indexFileName(int fileOrdinal, int generation) {
+ return String.format(INDEX_FILE_NAME_FORMAT, fileOrdinal, generation);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
new file mode 100644
index 00000000000..d9038e54cb1
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMeta.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+/**
+ * Meta information about a payload in an index file.
+ *
+ * @see IndexFileManager
+ */
+class IndexFileMeta {
+ private final long firstLogIndex;
+
+ private final long lastLogIndex;
+
+ private final int indexFilePayloadOffset;
+
+ IndexFileMeta(long firstLogIndex, long lastLogIndex, int
indexFilePayloadOffset) {
+ this.firstLogIndex = firstLogIndex;
+ this.lastLogIndex = lastLogIndex;
+ this.indexFilePayloadOffset = indexFilePayloadOffset;
+ }
+
+ /**
+ * Returns the inclusive lower bound of log indices stored in the index
file for the Raft Group.
+ */
+ long firstLogIndex() {
+ return firstLogIndex;
+ }
+
+ /**
+ * Returns the inclusive upper bound of log indices stored in the index
file for the Raft Group.
+ */
+ long lastLogIndex() {
+ return lastLogIndex;
+ }
+
+ /**
+ * Returns the offset of the payload for the Raft Group in the index file.
+ */
+ int indexFilePayloadOffset() {
+ return indexFilePayloadOffset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IndexFileMeta fileMeta = (IndexFileMeta) o;
+ return firstLogIndex == fileMeta.firstLogIndex && lastLogIndex ==
fileMeta.lastLogIndex
+ && indexFilePayloadOffset == fileMeta.indexFilePayloadOffset;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Long.hashCode(firstLogIndex);
+ result = 31 * result + Long.hashCode(lastLogIndex);
+ result = 31 * result + indexFilePayloadOffset;
+ return result;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
new file mode 100644
index 00000000000..2a171a704a2
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.util.Arrays;
+
+/**
+ * An array of {@link IndexFileMeta}.
+ *
+ * <p>Reads from multiple threads are thread-safe, but writes are expected to
be done from a single thread only.
+ */
+class IndexFileMetaArray {
+ static final int INITIAL_CAPACITY = 10;
+
+ private final IndexFileMeta[] array;
+
+ private final int size;
+
+ IndexFileMetaArray(IndexFileMeta initialMeta) {
+ this.array = new IndexFileMeta[INITIAL_CAPACITY];
+ this.array[0] = initialMeta;
+
+ this.size = 1;
+ }
+
+ private IndexFileMetaArray(IndexFileMeta[] array, int size) {
+ this.array = array;
+ this.size = size;
+ }
+
+ IndexFileMetaArray add(IndexFileMeta indexFileMeta) {
+ assert indexFileMeta.firstLogIndex() == array[size - 1].lastLogIndex()
+ 1 :
+ String.format("Index File Metas must be contiguous. Expected
log index: %d, actual log index: %d",
+ array[size - 1].lastLogIndex() + 1,
+ indexFileMeta.firstLogIndex()
+ );
+
+ // The array can be shared between multiple instances, but since it
always grows and we read at most "size" elements,
+ // we don't need to copy it every time.
+ IndexFileMeta[] array = this.array;
+
+ if (size == array.length) {
+ array = Arrays.copyOf(array, array.length * 2);
+ }
+
+ array[size] = indexFileMeta;
+
+ return new IndexFileMetaArray(array, size + 1);
+ }
+
+ IndexFileMeta get(int arrayIndex) {
+ return array[arrayIndex];
+ }
+
+ int size() {
+ return size;
+ }
+
+ /**
+ * Returns the array index of the {@link IndexFileMeta} containing the
given Raft log index or {@code -1} if no such meta exists.
+ */
+ int find(long logIndex) {
+ int lowArrayIndex = 0;
+ int highArrayIndex = size - 1;
+
+ while (lowArrayIndex <= highArrayIndex) {
+ int middleArrayIndex = (lowArrayIndex + highArrayIndex) >>> 1;
+
+ IndexFileMeta midValue = array[middleArrayIndex];
+
+ if (logIndex < midValue.firstLogIndex()) {
+ highArrayIndex = middleArrayIndex - 1;
+ } else if (logIndex > midValue.lastLogIndex()) {
+ lowArrayIndex = middleArrayIndex + 1;
+ } else {
+ return middleArrayIndex;
+ }
+ }
+
+ return -1;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFilePointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFilePointer.java
new file mode 100644
index 00000000000..34f54a660e9
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFilePointer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+class IndexFilePointer {
+ private final int fileOrdinal;
+
+ private final IndexFileMeta fileMeta;
+
+ IndexFilePointer(int fileOrdinal, IndexFileMeta fileMeta) {
+ this.fileOrdinal = fileOrdinal;
+ this.fileMeta = fileMeta;
+ }
+
+ int fileOrdinal() {
+ return fileOrdinal;
+ }
+
+ IndexFileMeta fileMeta() {
+ return fileMeta;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
index 4ee22066142..6c9b6688fc5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java
@@ -101,11 +101,9 @@ class RaftLogCheckpointer {
entry.segmentFile().sync();
- IndexFile indexFile =
indexFileManager.saveIndexMemtable(entry.memTable());
+ indexFileManager.saveIndexMemtable(entry.memTable());
queue.removeHead();
-
- indexFile.syncAndRename();
} catch (InterruptedException | ClosedByInterruptException e) {
// Interrupt is called on stop.
return;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index b52e82e57fc..5c479ef5bcb 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -116,11 +116,11 @@ class SegmentFileManager implements ManuallyCloseable {
private final Object rolloverLock = new Object();
/**
- * Current segment file index (used to generate segment file names).
+ * Current segment file ordinal (used to generate segment file names).
*
* <p>Must always be accessed under the {@link #rolloverLock}.
*/
- private int curFileIndex;
+ private int curSegmentFileOrdinal;
/**
* Flag indicating whether the file manager has been stopped.
@@ -149,8 +149,8 @@ class SegmentFileManager implements ManuallyCloseable {
currentSegmentFile.set(allocateNewSegmentFile(0));
}
- private SegmentFile allocateNewSegmentFile(int fileIndex) throws
IOException {
- Path path = baseDir.resolve(segmentFileName(fileIndex, 0));
+ private SegmentFile allocateNewSegmentFile(int fileOrdinal) throws
IOException {
+ Path path = baseDir.resolve(segmentFileName(fileOrdinal, 0));
var segmentFile = new SegmentFile(path, fileSize, 0);
@@ -159,8 +159,8 @@ class SegmentFileManager implements ManuallyCloseable {
return segmentFile;
}
- private static String segmentFileName(int fileIndex, int generation) {
- return String.format(SEGMENT_FILE_NAME_FORMAT, fileIndex, generation);
+ private static String segmentFileName(int fileOrdinal, int generation) {
+ return String.format(SEGMENT_FILE_NAME_FORMAT, fileOrdinal,
generation);
}
void appendEntry(long groupId, LogEntry entry, LogEntryEncoder encoder)
throws IOException {
@@ -245,7 +245,7 @@ class SegmentFileManager implements ManuallyCloseable {
throw new IgniteInternalException(NODE_STOPPING_ERR);
}
- SegmentFile newFile = allocateNewSegmentFile(++curFileIndex);
+ SegmentFile newFile =
allocateNewSegmentFile(++curSegmentFileOrdinal);
checkpointer.onRollover(observedSegmentFile,
memTable.transitionToReadMode());
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFilePointer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFilePointer.java
new file mode 100644
index 00000000000..09341e1e9df
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFilePointer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+class SegmentFilePointer {
+ private final int fileOrdinal;
+
+ private final int payloadOffset;
+
+ SegmentFilePointer(int fileOrdinal, int payloadOffset) {
+ this.fileOrdinal = fileOrdinal;
+ this.payloadOffset = payloadOffset;
+ }
+
+ int fileOrdinal() {
+ return fileOrdinal;
+ }
+
+ int payloadOffset() {
+ return payloadOffset;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
index 64ff40a45e4..2e22002b7db 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
@@ -77,7 +77,7 @@ class SegmentInfo {
}
/**
- * Base log index. All log indexes in this memtable lie in the {@code
[logIndexBase, logIndexBase + segmentFileOffsets.size]} range.
+ * Base log index. All log indexes in this memtable lie in the {@code
[logIndexBase, logIndexBase + segmentFileOffsets.size)} range.
*/
private final long logIndexBase;
@@ -138,10 +138,10 @@ class SegmentInfo {
}
/**
- * Returns the non-inclusive upper bound of log indices stored in this
memtable.
+ * Returns the inclusive upper bound of log indices stored in this
memtable.
*/
long lastLogIndex() {
- return logIndexBase + segmentFileOffsets.size();
+ return logIndexBase + segmentFileOffsets.size() - 1;
}
/**
@@ -157,8 +157,6 @@ class SegmentInfo {
void saveOffsetsTo(ByteBuffer buffer) {
ArrayWithSize offsets = segmentFileOffsets;
- for (int i = 0; i < offsets.size(); i++) {
- buffer.putInt(offsets.get(i));
- }
+ buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/ByteChannelUtils.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/ByteChannelUtils.java
new file mode 100644
index 00000000000..5708fcf371b
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/ByteChannelUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+class ByteChannelUtils {
+ static ByteBuffer readFully(ReadableByteChannel byteChannel, int len)
throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(len);
+
+ while (buffer.hasRemaining()) {
+ int bytesRead = byteChannel.read(buffer);
+
+ if (bytesRead == -1) {
+ throw new EOFException("EOF reached while reading byte
channel");
+ }
+ }
+
+ return buffer.rewind();
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedIndexFile.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedIndexFile.java
index b63ba757b62..e32532e87ef 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedIndexFile.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedIndexFile.java
@@ -27,15 +27,17 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.Nullable;
/**
- * Test-only class representing a deserialized {@link IndexFile}.
+ * Test-only class representing a deserialized index file.
*/
class DeserializedIndexFile {
/** groupId -> logIndex -> segmentFileOffset. */
@@ -46,28 +48,21 @@ class DeserializedIndexFile {
}
static DeserializedIndexFile fromFile(Path path) throws IOException {
- try (var indexFile = new RandomAccessFile(path.toFile(), "r")) {
+ try (SeekableByteChannel channel = Files.newByteChannel(path)) {
// Read common meta part of the header.
- byte[] commonMetaBytes = new byte[COMMON_META_SIZE];
-
- indexFile.readFully(commonMetaBytes);
-
- ByteBuffer commonMeta =
ByteBuffer.wrap(commonMetaBytes).order(IndexFileManager.BYTE_ORDER);
+ ByteBuffer commonMeta = readFully(channel, COMMON_META_SIZE);
assertThat(commonMeta.getInt(), is(MAGIC_NUMBER));
assertThat(commonMeta.getInt(), is(FORMAT_VERSION));
int numGroups = commonMeta.getInt();
+ // groupId -> logIndex -> segmentFileOffset.
Map<Long, Map<Long, Integer>> content = newHashMap(numGroups);
for (int i = 0; i < numGroups; i++) {
// Read group meta part of the header.
- byte[] groupMetaBytes = new byte[GROUP_META_SIZE];
-
- indexFile.readFully(groupMetaBytes);
-
- ByteBuffer groupMeta =
ByteBuffer.wrap(groupMetaBytes).order(IndexFileManager.BYTE_ORDER);
+ ByteBuffer groupMeta = readFully(channel, GROUP_META_SIZE);
long groupId = groupMeta.getLong();
@@ -80,25 +75,21 @@ class DeserializedIndexFile {
long lastIndex = groupMeta.getLong();
// Read the payload of the group.
- int payloadEntriesNum = (int) (lastIndex - firstIndex);
+ int payloadEntriesNum = (int) (lastIndex - firstIndex + 1);
Map<Long, Integer> logIndexToSegmentFileOffset =
newHashMap(payloadEntriesNum);
content.put(groupId, logIndexToSegmentFileOffset);
- byte[] payloadBytes = new byte[Integer.BYTES *
payloadEntriesNum];
+ long currentPosition = channel.position();
- long currentHeaderOffset = indexFile.getFilePointer();
+ channel.position(indexPayloadOffset);
- indexFile.seek(indexPayloadOffset);
+ ByteBuffer indexPayload = readFully(channel, Integer.BYTES *
payloadEntriesNum);
- indexFile.readFully(payloadBytes);
+ channel.position(currentPosition);
- indexFile.seek(currentHeaderOffset);
-
- ByteBuffer indexPayload =
ByteBuffer.wrap(payloadBytes).order(IndexFileManager.BYTE_ORDER);
-
- for (long logIndex = firstIndex; logIndex < lastIndex;
logIndex++) {
+ for (long logIndex = firstIndex; logIndex <= lastIndex;
logIndex++) {
logIndexToSegmentFileOffset.put(logIndex,
indexPayload.getInt());
}
}
@@ -108,7 +99,7 @@ class DeserializedIndexFile {
}
@Nullable
- Integer getOffset(long groupId, long logIndex) {
+ Integer getSegmentFileOffset(long groupId, long logIndex) {
Map<Long, Integer> logIndexToSegmentFileOffset = content.get(groupId);
return logIndexToSegmentFileOffset == null ? null :
logIndexToSegmentFileOffset.get(logIndex);
@@ -133,6 +124,10 @@ class DeserializedIndexFile {
.collect(toList());
}
+ private static ByteBuffer readFully(ReadableByteChannel byteChannel, int
len) throws IOException {
+ return ByteChannelUtils.readFully(byteChannel,
len).order(IndexFileManager.BYTE_ORDER);
+ }
+
static class Entry {
private final long groupId;
private final long logIndex;
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
index 8fca92d6a20..4366e261281 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
@@ -25,9 +25,8 @@ import static org.hamcrest.Matchers.is;
import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import org.apache.ignite.internal.util.FastCrc;
import org.jetbrains.annotations.Nullable;
@@ -44,91 +43,57 @@ class DeserializedSegmentPayload {
this.payload = payload;
}
- static DeserializedSegmentPayload fromBytes(byte[] bytes) {
- return fromByteBuffer(wrap(bytes));
- }
-
- static DeserializedSegmentPayload fromByteBuffer(ByteBuffer entryBuf) {
- long groupId = entryBuf.getLong();
-
- int payloadLength = entryBuf.getInt();
-
- byte[] payload = new byte[payloadLength];
-
- entryBuf.get(payload);
-
- int entrySizeWithoutCrc = entryBuf.position();
- int actualCrc = entryBuf.getInt();
- int expectedCrc = FastCrc.calcCrc(entryBuf.rewind(),
entrySizeWithoutCrc);
+ static @Nullable DeserializedSegmentPayload
fromByteChannel(ReadableByteChannel channel) throws IOException {
+ ByteBuffer groupIdBytes;
- assertThat(actualCrc, is(expectedCrc));
-
- return new DeserializedSegmentPayload(groupId, payload);
- }
-
- static @Nullable DeserializedSegmentPayload fromInputStream(InputStream
is) throws IOException {
- byte[] groupIdBytes = is.readNBytes(GROUP_ID_SIZE_BYTES);
-
- if (groupIdBytes.length < GROUP_ID_SIZE_BYTES) {
+ try {
+ groupIdBytes = readFully(channel, GROUP_ID_SIZE_BYTES);
+ } catch (EOFException e) {
// EOF reached.
return null;
}
- long groupId = wrap(groupIdBytes).getLong();
+ long groupId = groupIdBytes.getLong();
if (groupId == 0) {
// EOF reached.
return null;
}
- int payloadLength = wrap(is.readNBytes(LENGTH_SIZE_BYTES)).getInt();
+ int payloadLength = readFully(channel, LENGTH_SIZE_BYTES).getInt();
- byte[] remaining = is.readNBytes(payloadLength + HASH_SIZE);
+ ByteBuffer remaining = readFully(channel, payloadLength + HASH_SIZE);
- ByteBuffer entry = ByteBuffer.allocate(GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + payloadLength + HASH_SIZE)
+ ByteBuffer fullEntry = ByteBuffer.allocate(payloadLength +
GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE)
.order(SegmentFile.BYTE_ORDER)
.putLong(groupId)
.putInt(payloadLength)
.put(remaining)
- .flip();
+ .rewind();
- return fromByteBuffer(entry);
- }
+ int crcOffset = payloadLength + GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES;
- static @Nullable DeserializedSegmentPayload
fromRandomAccessFile(RandomAccessFile file) throws IOException {
- byte[] groupIdBytes = new byte[GROUP_ID_SIZE_BYTES];
+ int actualCrc = fullEntry.getInt(crcOffset);
- try {
- file.readFully(groupIdBytes);
- } catch (EOFException e) {
- return null;
- }
+ int expectedCrc = FastCrc.calcCrc(fullEntry, crcOffset);
- long groupId = wrap(groupIdBytes).getLong();
-
- if (groupId == 0) {
- // EOF reached.
- return null;
- }
+ assertThat(actualCrc, is(expectedCrc));
- byte[] payloadLengthBytes = new byte[LENGTH_SIZE_BYTES];
+ fullEntry.rewind();
- file.readFully(payloadLengthBytes);
+ return fromByteBuffer(fullEntry);
+ }
- int payloadLength = wrap(payloadLengthBytes).getInt();
+ private static DeserializedSegmentPayload fromByteBuffer(ByteBuffer
entryBuf) {
+ long groupId = entryBuf.getLong();
- byte[] remaining = new byte[payloadLength + HASH_SIZE];
+ int payloadLength = entryBuf.getInt();
- file.readFully(remaining);
+ byte[] payload = new byte[payloadLength];
- ByteBuffer entry = ByteBuffer.allocate(GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES + payloadLength + HASH_SIZE)
- .order(SegmentFile.BYTE_ORDER)
- .putLong(groupId)
- .putInt(payloadLength)
- .put(remaining)
- .flip();
+ entryBuf.get(payload);
- return fromByteBuffer(entry);
+ return new DeserializedSegmentPayload(groupId, payload);
}
long groupId() {
@@ -143,7 +108,7 @@ class DeserializedSegmentPayload {
return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length +
HASH_SIZE;
}
- private static ByteBuffer wrap(byte[] bytes) {
- return ByteBuffer.wrap(bytes).order(SegmentFile.BYTE_ORDER);
+ private static ByteBuffer readFully(ReadableByteChannel byteChannel, int
len) throws IOException {
+ return ByteChannelUtils.readFully(byteChannel,
len).order(SegmentFile.BYTE_ORDER);
}
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
new file mode 100644
index 00000000000..18de02a6023
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+class GroupIndexMetaTest extends BaseIgniteAbstractTest {
+ @Test
+ void testAddGet() {
+ var initialMeta = new IndexFileMeta(1, 50, 0);
+
+ var groupMeta = new GroupIndexMeta(0, initialMeta);
+
+ var additionalMeta = new IndexFileMeta(51, 100, 42);
+
+ groupMeta.addIndexMeta(additionalMeta);
+
+ assertThat(groupMeta.indexFilePointer(0), is(nullValue()));
+
+ IndexFilePointer pointer = groupMeta.indexFilePointer(1);
+
+ assertThat(pointer, is(notNullValue()));
+ assertThat(pointer.fileOrdinal(), is(0));
+ assertThat(pointer.fileMeta(), is(initialMeta));
+
+ pointer = groupMeta.indexFilePointer(66);
+
+ assertThat(pointer, is(notNullValue()));
+ assertThat(pointer.fileOrdinal(), is(1));
+ assertThat(pointer.fileMeta(), is(additionalMeta));
+
+ pointer = groupMeta.indexFilePointer(101);
+
+ assertThat(pointer, is(nullValue()));
+ }
+
+ @RepeatedTest(10)
+ void testOneWriterMultipleReaders() {
+ int startFileOrdinal = 100;
+
+ int logEntriesPerFile = 50;
+
+ var initialMeta = new IndexFileMeta(0, logEntriesPerFile - 1, 0);
+
+ var groupMeta = new GroupIndexMeta(startFileOrdinal, initialMeta);
+
+ int totalIndexFiles = 1000;
+
+ RunnableX writer = () -> {
+ for (int relativeFileOrdinal = 1; relativeFileOrdinal <
totalIndexFiles; relativeFileOrdinal++) {
+ long startLogIndex = relativeFileOrdinal * logEntriesPerFile;
+ long lastLogIndex = startLogIndex + logEntriesPerFile - 1;
+
+ groupMeta.addIndexMeta(new IndexFileMeta(startLogIndex,
lastLogIndex, 0));
+ }
+ };
+
+ int totalLogEntries = totalIndexFiles * logEntriesPerFile;
+
+ RunnableX reader = () -> {
+ for (int logIndex = 0; logIndex < totalLogEntries; logIndex++) {
+ IndexFilePointer pointer =
groupMeta.indexFilePointer(logIndex);
+
+ if (pointer != null) {
+ int relativeFileOrdinal = logIndex / logEntriesPerFile;
+
+ int expectedFileOrdinal = startFileOrdinal +
relativeFileOrdinal;
+
+ int expectedStartLogIndex = relativeFileOrdinal *
logEntriesPerFile;
+
+ int expectedEndLogIndex = expectedStartLogIndex +
logEntriesPerFile - 1;
+
+ var expectedMeta = new
IndexFileMeta(expectedStartLogIndex, expectedEndLogIndex, 0);
+
+ assertThat(pointer.fileOrdinal(), is(expectedFileOrdinal));
+ assertThat(pointer.fileMeta(), is(expectedMeta));
+ }
+ }
+ };
+
+ runRace(writer, reader, reader, reader);
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index 894ce6e7f4f..29aa8569246 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -39,33 +42,13 @@ class IndexFileManagerTest extends IgniteAbstractTest {
void testIndexFileNaming() throws IOException {
var memtable = new IndexMemTable(4);
- IndexFile indexFile0 = indexFileManager.saveIndexMemtable(memtable);
- IndexFile indexFile1 = indexFileManager.saveIndexMemtable(memtable);
- IndexFile indexFile2 = indexFileManager.saveIndexMemtable(memtable);
+ Path path0 = indexFileManager.saveIndexMemtable(memtable);
+ Path path1 = indexFileManager.saveIndexMemtable(memtable);
+ Path path2 = indexFileManager.saveIndexMemtable(memtable);
- assertThat(indexFile0.name(), is("index-0000000000-0000000000.bin"));
- assertThat(indexFile0.path(),
is(workDir.resolve("index-0000000000-0000000000.bin.tmp")));
-
- assertThat(indexFile1.name(), is("index-0000000001-0000000000.bin"));
- assertThat(indexFile1.path(),
is(workDir.resolve("index-0000000001-0000000000.bin.tmp")));
-
- assertThat(indexFile2.name(), is("index-0000000002-0000000000.bin"));
- assertThat(indexFile2.path(),
is(workDir.resolve("index-0000000002-0000000000.bin.tmp")));
-
- indexFile0.syncAndRename();
-
- assertThat(indexFile0.name(), is("index-0000000000-0000000000.bin"));
- assertThat(indexFile0.path(),
is(workDir.resolve("index-0000000000-0000000000.bin")));
-
- indexFile1.syncAndRename();
-
- assertThat(indexFile1.name(), is("index-0000000001-0000000000.bin"));
- assertThat(indexFile1.path(),
is(workDir.resolve("index-0000000001-0000000000.bin")));
-
- indexFile2.syncAndRename();
-
- assertThat(indexFile2.name(), is("index-0000000002-0000000000.bin"));
- assertThat(indexFile2.path(),
is(workDir.resolve("index-0000000002-0000000000.bin")));
+ assertThat(path0,
is(workDir.resolve("index-0000000000-0000000000.bin")));
+ assertThat(path1,
is(workDir.resolve("index-0000000001-0000000000.bin")));
+ assertThat(path2,
is(workDir.resolve("index-0000000002-0000000000.bin")));
}
@Test
@@ -74,7 +57,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
int entriesPerGroup = 5;
- int[] offsets = IntStream.range(0, numGroups * entriesPerGroup)
+ int[] segmentFileOffsets = IntStream.range(0, numGroups *
entriesPerGroup)
.map(i -> ThreadLocalRandom.current().nextInt())
.toArray();
@@ -84,24 +67,80 @@ class IndexFileManagerTest extends IgniteAbstractTest {
for (int i = 0; i < entriesPerGroup; i++) {
int offsetIndex = (groupId - 1) * entriesPerGroup + i;
- memtable.appendSegmentFileOffset(groupId, i,
offsets[offsetIndex]);
+ memtable.appendSegmentFileOffset(groupId, i,
segmentFileOffsets[offsetIndex]);
}
}
- IndexFile indexFile = indexFileManager.saveIndexMemtable(memtable);
+ Path indexFile = indexFileManager.saveIndexMemtable(memtable);
- DeserializedIndexFile deserializedIndexFile =
DeserializedIndexFile.fromFile(indexFile.path());
+ DeserializedIndexFile deserializedIndexFile =
DeserializedIndexFile.fromFile(indexFile);
for (int groupId = 1; groupId <= numGroups; groupId++) {
for (int i = 0; i < entriesPerGroup; i++) {
int offsetIndex = (groupId - 1) * entriesPerGroup + i;
- int expectedOffset = offsets[offsetIndex];
+ int expectedOffset = segmentFileOffsets[offsetIndex];
- Integer actualOffset =
deserializedIndexFile.getOffset(groupId, i);
+ Integer actualOffset =
deserializedIndexFile.getSegmentFileOffset(groupId, i);
assertThat(actualOffset, is(expectedOffset));
}
}
}
+
+ @Test
+ void testSearchIndexMeta() throws IOException {
+ int numGroups = 10;
+
+ int entriesPerGroup = 5;
+
+ int numMemtables = 5;
+
+ int[] segmentFileOffsets = IntStream.range(0, numMemtables *
entriesPerGroup)
+ .map(i -> ThreadLocalRandom.current().nextInt())
+ .toArray();
+
+ for (int memtableIndex = 0; memtableIndex < numMemtables;
memtableIndex++) {
+ var memtable = new IndexMemTable(4);
+
+ for (int groupId = 1; groupId <= numGroups; groupId++) {
+ for (int i = 0; i < entriesPerGroup; i++) {
+ int logIndex = memtableIndex * entriesPerGroup + i;
+
+ memtable.appendSegmentFileOffset(groupId, logIndex,
segmentFileOffsets[logIndex]);
+ }
+ }
+
+ indexFileManager.saveIndexMemtable(memtable);
+ }
+
+ for (int memtableIndex = 0; memtableIndex < numMemtables;
memtableIndex++) {
+ for (int groupId = 1; groupId <= numGroups; groupId++) {
+ for (int i = 0; i < entriesPerGroup; i++) {
+ int logIndex = memtableIndex * entriesPerGroup + i;
+
+ SegmentFilePointer pointer =
indexFileManager.getSegmentFilePointer(groupId, logIndex);
+
+ assertThat(pointer, is(notNullValue()));
+ assertThat(pointer.fileOrdinal(), is(memtableIndex));
+ assertThat(pointer.payloadOffset(),
is(segmentFileOffsets[logIndex]));
+ }
+ }
+ }
+ }
+
+ @Test
+ void testMissingIndexMeta() throws IOException {
+ assertThat(indexFileManager.getSegmentFilePointer(0, 0),
is(nullValue()));
+
+ var memtable = new IndexMemTable(4);
+
+ memtable.appendSegmentFileOffset(0, 0, 1);
+
+ indexFileManager.saveIndexMemtable(memtable);
+
+ assertThat(indexFileManager.getSegmentFilePointer(0, 0),
is(notNullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(0, 1),
is(nullValue()));
+ assertThat(indexFileManager.getSegmentFilePointer(1, 0),
is(nullValue()));
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
new file mode 100644
index 00000000000..4ff361e1708
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArrayTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileMetaArray.INITIAL_CAPACITY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+class IndexFileMetaArrayTest extends BaseIgniteAbstractTest {
+ @Test
+ void testAddGet() {
+ var initialMeta = new IndexFileMeta(1, 1, 0);
+
+ var array = new IndexFileMetaArray(initialMeta);
+
+ assertThat(array.size(), is(1));
+ assertThat(array.get(0), is(initialMeta));
+
+ var meta2 = new IndexFileMeta(2, 2, 0);
+
+ array = array.add(meta2);
+
+ assertThat(array.size(), is(2));
+ assertThat(array.get(1), is(meta2));
+
+ for (int i = 0; i < INITIAL_CAPACITY; i++) {
+ long logIndex = meta2.firstLogIndex() + i + 1;
+
+ array = array.add(new IndexFileMeta(logIndex, logIndex, 0));
+ }
+
+ var meta3 = new IndexFileMeta(INITIAL_CAPACITY + 3, INITIAL_CAPACITY +
3, 0);
+
+ array = array.add(meta3);
+
+ assertThat(array.size(), is(3 + INITIAL_CAPACITY));
+ assertThat(array.get(array.size() - 1), is(meta3));
+ }
+
+ @Test
+ void testFindReturnsCorrectIndex() {
+ var meta1 = new IndexFileMeta(1, 10, 100);
+ var meta2 = new IndexFileMeta(11, 20, 200);
+ var meta3 = new IndexFileMeta(21, 30, 300);
+
+ IndexFileMetaArray array = new IndexFileMetaArray(meta1)
+ .add(meta2)
+ .add(meta3);
+
+ assertThat(array.find(0), is(-1));
+
+ assertThat(array.find(1), is(0));
+ assertThat(array.find(5), is(0));
+ assertThat(array.find(10), is(0));
+
+ assertThat(array.find(11), is(1));
+ assertThat(array.find(15), is(1));
+ assertThat(array.find(20), is(1));
+
+ assertThat(array.find(21), is(2));
+ assertThat(array.find(25), is(2));
+ assertThat(array.find(30), is(2));
+
+ assertThat(array.find(31), is(-1));
+ }
+
+ @Test
+ void testFindReturnsMinusOneForOutOfRange() {
+ var meta = new IndexFileMeta(100, 200, 1000);
+ var array = new IndexFileMetaArray(meta);
+
+ assertThat(array.find(99), is(-1));
+ assertThat(array.find(201), is(-1));
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index b70b2908fd5..e9d59efcfec 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -22,11 +22,9 @@ import static
org.apache.ignite.internal.raft.storage.segstore.RaftLogCheckpoint
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -53,10 +51,7 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest
{
private IndexFileManager indexFileManager;
@BeforeEach
- void setUp(@Mock IndexFile indexFile) throws IOException {
- // To avoid NPE in the checkpoint thread.
- when(indexFileManager.saveIndexMemtable(any())).thenReturn(indexFile);
-
+ void setUp() {
checkpointer = new RaftLogCheckpointer(NODE_NAME, indexFileManager,
new NoOpFailureManager());
checkpointer.start();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 0943ad431c7..0b00a848847 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.raft.storage.segstore.ByteChannelUtils.readFully;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
@@ -43,8 +44,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import java.io.InputStream;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -169,11 +172,11 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
Path segmentFile = findSoleSegmentFile();
- try (InputStream is = Files.newInputStream(segmentFile)) {
- assertThat(is.readNBytes(HEADER_RECORD.length), is(HEADER_RECORD));
+ try (ByteChannel channel = Files.newByteChannel(segmentFile)) {
+ assertThat(readFully(channel, HEADER_RECORD.length).array(),
is(HEADER_RECORD));
for (byte[] expectedBatch : batches) {
- validateSegmentEntry(is.readNBytes(expectedBatch.length +
SegmentPayload.overheadSize()), expectedBatch);
+ validateSegmentEntry(channel, expectedBatch);
}
}
@@ -198,14 +201,14 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
for (int i = 0; i < batches.size(); i++) {
byte[] expectedBatch = batches.get(i);
- try (InputStream is = Files.newInputStream(segmentFiles.get(i))) {
- assertThat(is.readNBytes(HEADER_RECORD.length),
is(HEADER_RECORD));
+ try (ByteChannel channel =
Files.newByteChannel(segmentFiles.get(i))) {
+ assertThat(readFully(channel, HEADER_RECORD.length).array(),
is(HEADER_RECORD));
- validateSegmentEntry(is.readNBytes(expectedBatch.length +
SegmentPayload.overheadSize()), expectedBatch);
+ validateSegmentEntry(channel, expectedBatch);
if (i != batches.size() - 1) {
// All segment files except the last one must contain a
segment switch record.
- assertThat(is.readNBytes(SWITCH_SEGMENT_RECORD.length),
is(SWITCH_SEGMENT_RECORD));
+ assertThat(readFully(channel,
SWITCH_SEGMENT_RECORD.length).array(), is(SWITCH_SEGMENT_RECORD));
}
}
}
@@ -381,21 +384,23 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
int entrySize = batchLength + SegmentPayload.overheadSize();
for (Path segmentFile : segmentFiles()) {
- try (InputStream is = Files.newInputStream(segmentFile)) {
- assertThat(is.readNBytes(HEADER_RECORD.length),
is(HEADER_RECORD));
+ try (SeekableByteChannel channel =
Files.newByteChannel(segmentFile)) {
+ assertThat(readFully(channel, HEADER_RECORD.length).array(),
is(HEADER_RECORD));
int bytesRead = HEADER_RECORD.length;
while (bytesRead + entrySize < FILE_SIZE && result.size() <
numBatches) {
- byte[] entry = is.readNBytes(entrySize);
+ long position = channel.position();
+
+
result.add(DeserializedSegmentPayload.fromByteChannel(channel));
- result.add(DeserializedSegmentPayload.fromBytes(entry));
+ assertThat(channel.position(), is(position + entrySize));
bytesRead += entrySize;
}
if (FILE_SIZE - bytesRead >= SWITCH_SEGMENT_RECORD.length) {
- assertThat(is.readNBytes(SWITCH_SEGMENT_RECORD.length),
is(SWITCH_SEGMENT_RECORD));
+ assertThat(readFully(channel,
SWITCH_SEGMENT_RECORD.length).array(), is(SWITCH_SEGMENT_RECORD));
}
}
}
@@ -430,9 +435,10 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
});
}
- private static void validateSegmentEntry(byte[] entry, byte[]
expectedPayload) {
- DeserializedSegmentPayload deserializedSegmentPayload =
DeserializedSegmentPayload.fromBytes(entry);
+ private static void validateSegmentEntry(ReadableByteChannel channel,
byte[] expectedPayload) throws IOException {
+ DeserializedSegmentPayload deserializedSegmentPayload =
DeserializedSegmentPayload.fromByteChannel(channel);
+ assertThat(deserializedSegmentPayload, is(notNullValue()));
assertThat(deserializedSegmentPayload.groupId(), is(GROUP_ID));
assertThat(deserializedSegmentPayload.payload(), is(expectedPayload));
}
@@ -440,11 +446,11 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
private static void validateIndexFile(Path indexFilePath, Path
segmentFilePath) throws IOException {
DeserializedIndexFile indexFile =
DeserializedIndexFile.fromFile(indexFilePath);
- try (var segmentFile = new RandomAccessFile(segmentFilePath.toFile(),
"r")) {
+ try (SeekableByteChannel channel =
Files.newByteChannel(segmentFilePath)) {
for (DeserializedIndexFile.Entry indexEntry : indexFile.entries())
{
- segmentFile.seek(indexEntry.segmentFileOffset());
+ channel.position(indexEntry.segmentFileOffset());
- DeserializedSegmentPayload segmentPayload =
DeserializedSegmentPayload.fromRandomAccessFile(segmentFile);
+ DeserializedSegmentPayload segmentPayload =
DeserializedSegmentPayload.fromByteChannel(channel);
assertThat(segmentPayload, is(notNullValue()));
assertThat(segmentPayload.groupId(), is(indexEntry.groupId()));
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index dbc5c1c1c30..265114966db 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -19,19 +19,19 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD;
-import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.overheadSize;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -123,12 +123,13 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
assertThat(segmentFiles, hasSize(1));
- try (InputStream is = Files.newInputStream(segmentFiles.get(0))) {
+ try (SeekableByteChannel channel =
Files.newByteChannel(segmentFiles.get(0))) {
// Skip header.
- is.readNBytes(SegmentFileManager.HEADER_RECORD.length);
+ channel.position(channel.position() +
SegmentFileManager.HEADER_RECORD.length);
- DeserializedSegmentPayload entry =
DeserializedSegmentPayload.fromBytes(is.readNBytes(overheadSize() +
payload.length));
+ DeserializedSegmentPayload entry =
DeserializedSegmentPayload.fromByteChannel(channel);
+ assertThat(entry, is(notNullValue()));
assertThat(entry.groupId(), is(GROUP_ID));
assertThat(entry.payload(), is(payload));
}
@@ -184,14 +185,14 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
var actualEntries = new
ArrayList<DeserializedSegmentPayload>(payloads.size());
for (Path segmentFile : segmentFiles()) {
- try (InputStream is = Files.newInputStream(segmentFile)) {
+ try (SeekableByteChannel channel =
Files.newByteChannel(segmentFile)) {
// Skip header.
- is.readNBytes(SegmentFileManager.HEADER_RECORD.length);
+ channel.position(channel.position() +
SegmentFileManager.HEADER_RECORD.length);
long bytesRead = SegmentFileManager.HEADER_RECORD.length;
while (bytesRead < SEGMENT_SIZE -
SWITCH_SEGMENT_RECORD.length) {
- DeserializedSegmentPayload entry =
DeserializedSegmentPayload.fromInputStream(is);
+ DeserializedSegmentPayload entry =
DeserializedSegmentPayload.fromByteChannel(channel);
if (entry == null) {
// EOF reached.