This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1f0c6855233 IGNITE-27866 Add single thread memtable to Log Storage
(#7600)
1f0c6855233 is described below
commit 1f0c685523373cdaa5bb51e3edafb1dbbe9202ef
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Feb 17 09:24:42 2026 +0200
IGNITE-27866 Add single thread memtable to Log Storage (#7600)
---
.../{IndexMemTable.java => AbstractMemTable.java} | 107 ++----------------
.../raft/storage/segstore/SegmentFileManager.java | 24 ++--
.../raft/storage/segstore/SegmentInfo.java | 2 +-
.../storage/segstore/SegmentPayloadParser.java | 12 +-
.../storage/segstore/SingleThreadMemTable.java | 45 ++++++++
.../raft/storage/segstore/StripedMemTable.java | 122 +++++++++++++++++++++
...MemTableTest.java => AbstractMemTableTest.java} | 81 +-------------
.../storage/segstore/IndexFileManagerTest.java | 76 ++++++-------
.../storage/segstore/RaftLogCheckpointerTest.java | 10 +-
.../storage/segstore/SingleThreadMemTableTest.java | 25 +++++
.../raft/storage/segstore/StripedMemTableTest.java | 108 ++++++++++++++++++
11 files changed, 368 insertions(+), 244 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
similarity index 56%
rename from
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
rename to
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
index 1eaa7bb814b..ad66f091c1b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTable.java
@@ -17,36 +17,15 @@
package org.apache.ignite.internal.raft.storage.segstore;
-import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-class IndexMemTable implements WriteModeIndexMemTable, ReadModeIndexMemTable {
- private static class Stripe {
- /** Map from group ID to SegmentInfo. */
- private final ConcurrentMap<Long, SegmentInfo> memTable = new
ConcurrentHashMap<>();
- }
-
- private final Stripe[] stripes;
-
- IndexMemTable(int stripes) {
- this.stripes = new Stripe[stripes];
-
- for (int i = 0; i < stripes; i++) {
- this.stripes[i] = new Stripe();
- }
- }
+import java.util.Map;
+abstract class AbstractMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
@Override
public void appendSegmentFileOffset(long groupId, long logIndex, int
segmentFileOffset) {
// File offset can be less than 0 (it's treated as an unsigned
integer) but never 0, because of the file header.
assert segmentFileOffset != 0 : String.format("Segment file offset
must not be 0 [groupId=%d]", groupId);
- ConcurrentMap<Long, SegmentInfo> memTable = memtable(groupId);
+ Map<Long, SegmentInfo> memTable = memtable(groupId);
SegmentInfo segmentInfo = memTable.get(groupId);
@@ -74,7 +53,7 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
@Override
public void truncateSuffix(long groupId, long lastLogIndexKept) {
- ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+ Map<Long, SegmentInfo> memtable = memtable(groupId);
memtable.compute(groupId, (id, segmentInfo) -> {
if (segmentInfo == null || lastLogIndexKept <
segmentInfo.firstLogIndexInclusive()) {
@@ -93,7 +72,7 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
@Override
public void truncatePrefix(long groupId, long firstIndexKept) {
- ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+ Map<Long, SegmentInfo> memtable = memtable(groupId);
memtable.compute(groupId, (id, segmentInfo) -> {
if (segmentInfo == null) {
@@ -107,7 +86,7 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
@Override
public void reset(long groupId, long nextLogIndex) {
- ConcurrentMap<Long, SegmentInfo> memtable = memtable(groupId);
+ Map<Long, SegmentInfo> memtable = memtable(groupId);
memtable.compute(groupId, (id, segmentInfo) -> {
if (segmentInfo == null || segmentInfo.isPrefixTombstone() ||
nextLogIndex < segmentInfo.firstLogIndexInclusive()) {
@@ -124,77 +103,5 @@ class IndexMemTable implements WriteModeIndexMemTable,
ReadModeIndexMemTable {
return this;
}
- /**
- * {@inheritDoc}
- *
- * <p>This method is not thread-safe wrt concurrent writes, because it is
expected to be used when no writes are happening anymore.
- */
- @Override
- public int numGroups() {
- int result = 0;
-
- for (Stripe stripe : stripes) {
- result += stripe.memTable.size();
- }
-
- return result;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>This method is not thread-safe wrt concurrent writes, because it is
expected to be used when no writes are happening anymore.
- */
- @Override
- public Iterator<Entry<Long, SegmentInfo>> iterator() {
- return new SegmentInfoIterator();
- }
-
- private Stripe stripe(long groupId) {
- // FIXME: We should calculate stripes the same way it is done in
StripedDisruptor,
- // see https://issues.apache.org/jira/browse/IGNITE-26907
- int stripeIndex = safeAbs(Long.hashCode(groupId) % stripes.length);
-
- return stripes[stripeIndex];
- }
-
- private ConcurrentMap<Long, SegmentInfo> memtable(long groupId) {
- return stripe(groupId).memTable;
- }
-
- private class SegmentInfoIterator implements Iterator<Entry<Long,
SegmentInfo>> {
- private int stripeIndex = 0;
-
- private Iterator<Entry<Long, SegmentInfo>> mapIterator =
refreshIterator();
-
- @Override
- public boolean hasNext() {
- if (mapIterator.hasNext()) {
- return true;
- }
-
- if (stripeIndex < stripes.length) {
- mapIterator = refreshIterator();
-
- return hasNext();
- }
-
- return false;
- }
-
- @Override
- public Entry<Long, SegmentInfo> next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- return mapIterator.next();
- }
-
- private Iterator<Entry<Long, SegmentInfo>> refreshIterator() {
- Stripe nextStripe = stripes[stripeIndex++];
-
- return nextStripe.memTable.entrySet().iterator();
- }
- }
+ protected abstract Map<Long, SegmentInfo> memtable(long groupId);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 1829cc9a3e8..9f50c7598d0 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -187,7 +187,7 @@ class SegmentFileManager implements ManuallyCloseable {
indexFileManager.cleanupTmpFiles();
- var payloadParser = new SegmentPayloadParser(stripes);
+ var payloadParser = new SegmentPayloadParser();
Path lastSegmentFilePath = null;
@@ -251,7 +251,7 @@ class SegmentFileManager implements ManuallyCloseable {
writeHeader(segmentFile);
- return new SegmentFileWithMemtable(segmentFile, new
IndexMemTable(stripes), false);
+ return new SegmentFileWithMemtable(segmentFile, new
StripedMemTable(stripes), false);
}
/**
@@ -260,7 +260,13 @@ class SegmentFileManager implements ManuallyCloseable {
* possibly incomplete segment file.
*/
private SegmentFileWithMemtable recoverLatestSegmentFile(Path
segmentFilePath, SegmentPayloadParser payloadParser) throws IOException {
- return recoverSegmentFile(segmentFilePath, payloadParser, true);
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
isSync);
+
+ var memTable = new StripedMemTable(stripes);
+
+ payloadParser.recoverMemtable(segmentFile, memTable, true);
+
+ return new SegmentFileWithMemtable(segmentFile, memTable, false);
}
/**
@@ -271,17 +277,11 @@ class SegmentFileManager implements ManuallyCloseable {
* never happen during this method's invocation), not to validate storage
integrity.
*/
private SegmentFileWithMemtable recoverSegmentFile(Path segmentFilePath,
SegmentPayloadParser payloadParser) throws IOException {
- return recoverSegmentFile(segmentFilePath, payloadParser, false);
- }
-
- private SegmentFileWithMemtable recoverSegmentFile(
- Path segmentFilePath,
- SegmentPayloadParser payloadParser,
- boolean validateCrc
- ) throws IOException {
SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
isSync);
- WriteModeIndexMemTable memTable =
payloadParser.recoverMemtable(segmentFile, validateCrc);
+ var memTable = new SingleThreadMemTable();
+
+ payloadParser.recoverMemtable(segmentFile, memTable, false);
return new SegmentFileWithMemtable(segmentFile, memTable, false);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
index cbf2c89ee32..abca1f3f818 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
/**
- * Information about a segment file for single Raft Group stored in a {@link
IndexMemTable}.
+ * Information about a segment file for single Raft Group stored in a {@link
AbstractMemTable}.
*
* <p>It consists of a base log index and an array of segment file offsets
which stores in log entry offsets which indices lie in the
* {@code [logIndexBase, logIndexBase + segmentFileOffsets.size)} range.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
index 332336cc42d..76e9b7477c6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayloadParser.java
@@ -32,19 +32,11 @@ import org.apache.ignite.internal.raft.util.VarlenEncoder;
import org.apache.ignite.internal.util.FastCrc;
class SegmentPayloadParser {
- private final int stripes;
-
- SegmentPayloadParser(int stripes) {
- this.stripes = stripes;
- }
-
- WriteModeIndexMemTable recoverMemtable(SegmentFile segmentFile, boolean
validateCrc) {
+ void recoverMemtable(SegmentFile segmentFile, WriteModeIndexMemTable
memtable, boolean validateCrc) {
ByteBuffer buffer = segmentFile.buffer();
validateSegmentFileHeader(buffer, segmentFile.path());
- var memtable = new IndexMemTable(stripes);
-
while (!endOfSegmentReached(buffer)) {
int segmentFilePayloadOffset = buffer.position();
@@ -110,8 +102,6 @@ class SegmentPayloadParser {
buffer.position(crcPosition + CRC_SIZE_BYTES);
}
-
- return memtable;
}
private static void validateSegmentFileHeader(ByteBuffer buffer, Path
segmentFilePath) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SingleThreadMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SingleThreadMemTable.java
new file mode 100644
index 00000000000..ecb925ed737
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SingleThreadMemTable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Not thread-safe memtable implementation designed to be used by a single
thread.
+ */
+class SingleThreadMemTable extends AbstractMemTable {
+ private final Map<Long, SegmentInfo> memtable = new HashMap<>();
+
+ @Override
+ protected Map<Long, SegmentInfo> memtable(long groupId) {
+ return memtable;
+ }
+
+ @Override
+ public Iterator<Entry<Long, SegmentInfo>> iterator() {
+ return memtable.entrySet().iterator();
+ }
+
+ @Override
+ public int numGroups() {
+ return memtable.size();
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/StripedMemTable.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/StripedMemTable.java
new file mode 100644
index 00000000000..8c0b8f6e6d1
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/StripedMemTable.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Thread-safe memtable implementation using group-based stripes to reduce
contention.
+ */
+class StripedMemTable extends AbstractMemTable {
+ private static class Stripe {
+ /** Map from group ID to SegmentInfo. */
+ private final ConcurrentMap<Long, SegmentInfo> memTable = new
ConcurrentHashMap<>();
+ }
+
+ private final Stripe[] stripes;
+
+ StripedMemTable(int stripes) {
+ this.stripes = new Stripe[stripes];
+
+ for (int i = 0; i < stripes; i++) {
+ this.stripes[i] = new Stripe();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>This method is not thread-safe wrt concurrent writes, because it is
expected to be used when no writes are happening anymore.
+ */
+ @Override
+ public int numGroups() {
+ int result = 0;
+
+ for (Stripe stripe : stripes) {
+ result += stripe.memTable.size();
+ }
+
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>This method is not thread-safe wrt concurrent writes, because it is
expected to be used when no writes are happening anymore.
+ */
+ @Override
+ public Iterator<Entry<Long, SegmentInfo>> iterator() {
+ return new SegmentInfoIterator();
+ }
+
+ private Stripe stripe(long groupId) {
+ // FIXME: We should calculate stripes the same way it is done in
StripedDisruptor,
+ // see https://issues.apache.org/jira/browse/IGNITE-26907
+ int stripeIndex = safeAbs(Long.hashCode(groupId) % stripes.length);
+
+ return stripes[stripeIndex];
+ }
+
+ @Override
+ protected Map<Long, SegmentInfo> memtable(long groupId) {
+ return stripe(groupId).memTable;
+ }
+
+ private class SegmentInfoIterator implements Iterator<Entry<Long,
SegmentInfo>> {
+ private int stripeIndex = 0;
+
+ private Iterator<Entry<Long, SegmentInfo>> mapIterator =
refreshIterator();
+
+ @Override
+ public boolean hasNext() {
+ if (mapIterator.hasNext()) {
+ return true;
+ }
+
+ if (stripeIndex < stripes.length) {
+ mapIterator = refreshIterator();
+
+ return hasNext();
+ }
+
+ return false;
+ }
+
+ @Override
+ public Entry<Long, SegmentInfo> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return mapIterator.next();
+ }
+
+ private Iterator<Entry<Long, SegmentInfo>> refreshIterator() {
+ Stripe nextStripe = stripes[stripeIndex++];
+
+ return nextStripe.memTable.entrySet().iterator();
+ }
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
similarity index 84%
rename from
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
rename to
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
index 03abeed52de..f97031939cf 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTableTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/AbstractMemTableTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.raft.storage.segstore;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.is;
@@ -27,18 +26,16 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
-import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
-class IndexMemTableTest extends BaseIgniteAbstractTest {
- private static final int STRIPES = 10;
+abstract class AbstractMemTableTest<T extends WriteModeIndexMemTable &
ReadModeIndexMemTable> extends BaseIgniteAbstractTest {
+ @SuppressWarnings({"AbstractMethodCallInConstructor",
"OverriddenMethodCallDuringObjectConstruction"})
+ final T memTable = memTable();
- private final IndexMemTable memTable = new IndexMemTable(STRIPES);
+ abstract T memTable();
@Test
void testPutGet() {
@@ -103,76 +100,6 @@ class IndexMemTableTest extends BaseIgniteAbstractTest {
});
}
- @RepeatedTest(10)
- void testOneWriterMultipleReaders() {
- int numItems = 1000;
-
- // One thread writes and two threads read from the same group ID.
- RunnableX writer = () -> {
- for (int i = 0; i < numItems; i++) {
- memTable.appendSegmentFileOffset(0, i, i + 1);
- }
- };
-
- RunnableX reader = () -> {
- for (int i = 0; i < numItems; i++) {
- SegmentInfo segmentInfo = memTable.segmentInfo(0);
-
- if (segmentInfo != null) {
- assertThat(segmentInfo.getOffset(i), either(is(i +
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
- }
- }
- };
-
- runRace(writer, reader, reader);
- }
-
- @RepeatedTest(10)
- void testMultithreadedPutGet() {
- int itemsPerGroup = 1000;
-
- var actions = new ArrayList<RunnableX>(STRIPES * 2);
-
- for (int i = 0; i < STRIPES; i++) {
- long groupId = i;
-
- actions.add(() -> {
- for (int j = 0; j < itemsPerGroup; j++) {
- memTable.appendSegmentFileOffset(groupId, j, j + 1);
- }
- });
- }
-
- for (int i = 0; i < STRIPES; i++) {
- long groupId = i;
-
- actions.add(() -> {
- for (int j = 0; j < itemsPerGroup; j++) {
- SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
-
- if (segmentInfo != null) {
- assertThat(segmentInfo.getOffset(j), either(is(j +
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
- }
- }
- });
- }
-
- runRace(actions.toArray(RunnableX[]::new));
-
- // Check that all values are present after all writes completed.
- assertThat(memTable.numGroups(), is(STRIPES));
-
- for (int groupId = 0; groupId < STRIPES; groupId++) {
- SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
-
- assertThat(segmentInfo, is(notNullValue()));
-
- for (int j = 0; j < itemsPerGroup; j++) {
- assertThat(segmentInfo.getOffset(j), is(j + 1));
- }
- }
- }
-
@Test
void testTruncateSuffix() {
long groupId0 = 1;
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
index a538602acf1..d46dc5c7c39 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java
@@ -44,7 +44,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testIndexFileNaming() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
Path path0 = indexFileManager.saveNewIndexMemtable(memtable);
Path path1 = indexFileManager.saveNewIndexMemtable(memtable);
@@ -65,7 +65,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
.map(i -> ThreadLocalRandom.current().nextInt())
.toArray();
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
for (int groupId = 1; groupId <= numGroups; groupId++) {
for (int i = 0; i < entriesPerGroup; i++) {
@@ -105,7 +105,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
.toArray();
for (int memtableIndex = 0; memtableIndex < numMemtables;
memtableIndex++) {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
for (int groupId = 1; groupId <= numGroups; groupId++) {
for (int i = 0; i < entriesPerGroup; i++) {
@@ -137,7 +137,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
void testMissingIndexMeta() throws IOException {
assertThat(indexFileManager.getSegmentFilePointer(0, 0),
is(nullValue()));
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 0, 1);
@@ -153,19 +153,19 @@ class IndexFileManagerTest extends IgniteAbstractTest {
*/
@Test
void getSegmentFilePointerWithGroupGaps() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 0, 1);
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(1, 0, 2);
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 3);
@@ -189,7 +189,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testFirstLastLogIndicesIndependence() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
@@ -201,7 +201,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.firstLogIndexInclusive(1), is(-1L));
assertThat(indexFileManager.lastLogIndexExclusive(1), is(-1L));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(1, 2, 1);
@@ -216,7 +216,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testFirstLastLogIndicesWithTruncateSuffix() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 1);
@@ -227,7 +227,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncateSuffix(0, 1);
@@ -239,7 +239,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testFirstLastLogIndicesWithTruncatePrefix() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 1);
@@ -250,7 +250,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncatePrefix(0, 2);
@@ -262,7 +262,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testGetSegmentPointerWithTruncate() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -272,7 +272,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(new FileProperties(0), 2)));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncateSuffix(0, 1);
@@ -281,7 +281,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(0, 1), is(new
SegmentFilePointer(new FileProperties(0), 1)));
assertThat(indexFileManager.getSegmentFilePointer(0, 2),
is(nullValue()));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -292,13 +292,13 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testRecovery() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -316,7 +316,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.getSegmentFilePointer(0, 2), is(new
SegmentFilePointer(new FileProperties(1), 2)));
assertThat(indexFileManager.getSegmentFilePointer(0, 3),
is(nullValue()));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 3, 3);
@@ -329,7 +329,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testRecoveryWithTruncateSuffix() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -337,7 +337,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncateSuffix(0, 2);
@@ -354,7 +354,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testRecoveryWithTruncatePrefix() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -362,7 +362,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncatePrefix(0, 2);
@@ -382,7 +382,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.indexFileExists(new FileProperties(0)),
is(false));
assertThat(indexFileManager.indexFileExists(new FileProperties(1)),
is(false));
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
@@ -399,13 +399,13 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testSaveMemtableWithExplicitOrdinal() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
indexFileManager.recoverIndexFile(memtable, new FileProperties(5));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -422,25 +422,25 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testTruncatePrefix() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 2, 1);
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 3, 1);
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncatePrefix(0, 2);
@@ -453,7 +453,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testCombinationOfPrefixAndSuffixTombstones() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -462,7 +462,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.truncatePrefix(0, 2);
@@ -488,7 +488,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testReset() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -497,7 +497,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.reset(0, 2);
@@ -513,7 +513,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testResetTombstone() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 2);
@@ -522,7 +522,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
indexFileManager.saveNewIndexMemtable(memtable);
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.reset(0, 2);
@@ -536,7 +536,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
@Test
void testFirstLastLogIndicesWithReset() throws IOException {
- var memtable = new IndexMemTable(STRIPES);
+ var memtable = new StripedMemTable(STRIPES);
memtable.appendSegmentFileOffset(0, 1, 1);
memtable.appendSegmentFileOffset(0, 2, 1);
@@ -547,7 +547,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
assertThat(indexFileManager.firstLogIndexInclusive(0), is(1L));
assertThat(indexFileManager.lastLogIndexExclusive(0), is(4L));
- memtable = new IndexMemTable(STRIPES);
+ memtable = new StripedMemTable(STRIPES);
memtable.reset(0, 2);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
index 25c05ebac59..f172899123e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointerTest.java
@@ -74,7 +74,7 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
}
@Test
- void testOnRollover(@Mock SegmentFile segmentFile, @Mock IndexMemTable
memTable) throws IOException {
+ void testOnRollover(@Mock SegmentFile segmentFile, @Mock StripedMemTable
memTable) throws IOException {
checkpointer.onRollover(segmentFile, memTable);
verify(segmentFile, timeout(500)).sync();
@@ -84,7 +84,7 @@ class RaftLogCheckpointerTest extends BaseIgniteAbstractTest {
@Test
void testBlockOnRollover(
@Mock SegmentFile segmentFile,
- @Mock IndexMemTable memTable,
+ @Mock StripedMemTable memTable,
@InjectExecutorService(threadCount = 1) ExecutorService executor
) {
var blockFuture = new CompletableFuture<Void>();
@@ -125,7 +125,7 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
when(mockFile.buffer()).thenReturn(buffer);
- IndexMemTable mockMemTable = mock(IndexMemTable.class);
+ StripedMemTable mockMemTable = mock(StripedMemTable.class);
var segmentInfo = new SegmentInfo(i);
@@ -161,7 +161,7 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
}
@Test
- void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+ void testFindSegmentPayloadReturnsBufferWhenOffsetPresent(@Mock
SegmentFile mockFile, @Mock StripedMemTable mockMemTable) {
var blockFuture = new CompletableFuture<Void>();
try {
@@ -194,7 +194,7 @@ class RaftLogCheckpointerTest extends
BaseIgniteAbstractTest {
}
@Test
- void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock
SegmentFile mockFile, @Mock IndexMemTable mockMemTable) {
+ void testFindSegmentPayloadReturnsEmptyWhenPrefixTombstoneCutsOff(@Mock
SegmentFile mockFile, @Mock StripedMemTable mockMemTable) {
var blockFuture = new CompletableFuture<Void>();
try {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SingleThreadMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SingleThreadMemTableTest.java
new file mode 100644
index 00000000000..963bbb1053f
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SingleThreadMemTableTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+class SingleThreadMemTableTest extends
AbstractMemTableTest<SingleThreadMemTable> {
+ @Override
+ SingleThreadMemTable memTable() {
+ return new SingleThreadMemTable();
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/StripedMemTableTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/StripedMemTableTest.java
new file mode 100644
index 00000000000..253da1971ef
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/StripedMemTableTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentInfo.MISSING_SEGMENT_FILE_OFFSET;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.ArrayList;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.junit.jupiter.api.RepeatedTest;
+
+class StripedMemTableTest extends AbstractMemTableTest<StripedMemTable> {
+ private static final int STRIPES = 10;
+
+ @Override
+ StripedMemTable memTable() {
+ return new StripedMemTable(STRIPES);
+ }
+
+ @RepeatedTest(10)
+ void testOneWriterMultipleReaders() {
+ int numItems = 1000;
+
+ // One thread writes and two threads read from the same group ID.
+ RunnableX writer = () -> {
+ for (int i = 0; i < numItems; i++) {
+ memTable.appendSegmentFileOffset(0, i, i + 1);
+ }
+ };
+
+ RunnableX reader = () -> {
+ for (int i = 0; i < numItems; i++) {
+ SegmentInfo segmentInfo = memTable.segmentInfo(0);
+
+ if (segmentInfo != null) {
+ assertThat(segmentInfo.getOffset(i), either(is(i +
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
+ }
+ }
+ };
+
+ runRace(writer, reader, reader);
+ }
+
+ @RepeatedTest(10)
+ void testMultithreadedPutGet() {
+ int itemsPerGroup = 1000;
+
+ var actions = new ArrayList<RunnableX>(STRIPES * 2);
+
+ for (int i = 0; i < STRIPES; i++) {
+ long groupId = i;
+
+ actions.add(() -> {
+ for (int j = 0; j < itemsPerGroup; j++) {
+ memTable.appendSegmentFileOffset(groupId, j, j + 1);
+ }
+ });
+ }
+
+ for (int i = 0; i < STRIPES; i++) {
+ long groupId = i;
+
+ actions.add(() -> {
+ for (int j = 0; j < itemsPerGroup; j++) {
+ SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
+
+ if (segmentInfo != null) {
+ assertThat(segmentInfo.getOffset(j), either(is(j +
1)).or(is(MISSING_SEGMENT_FILE_OFFSET)));
+ }
+ }
+ });
+ }
+
+ runRace(actions.toArray(RunnableX[]::new));
+
+ // Check that all values are present after all writes completed.
+ assertThat(memTable.numGroups(), is(STRIPES));
+
+ for (int groupId = 0; groupId < STRIPES; groupId++) {
+ SegmentInfo segmentInfo = memTable.segmentInfo(groupId);
+
+ assertThat(segmentInfo, is(notNullValue()));
+
+ for (int j = 0; j < itemsPerGroup; j++) {
+ assertThat(segmentInfo.getOffset(j), is(j + 1));
+ }
+ }
+ }
+}