This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fcdc36f16e9 IGNITE-26419 Optimize V1Encoder for ByteBuffers (#6603)
fcdc36f16e9 is described below
commit fcdc36f16e9c9859c0cb631442f7f09a62088cd3
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Sep 18 11:48:08 2025 +0300
IGNITE-26419 Optimize V1Encoder for ByteBuffers (#6603)
---
.../raft/storage/segstore/SegmentPayload.java | 19 ++++--
.../raft/jraft/entity/codec/LogEntryEncoder.java | 14 ++++
.../raft/jraft/entity/codec/v1/V1Encoder.java | 74 +++++++++++-----------
.../ignite/raft/jraft/util/AsciiStringUtil.java | 7 +-
.../storage/segstore/SegmentFileManagerTest.java | 19 +++++-
.../storage/segstore/SegstoreLogStorageTest.java | 43 ++++++++++++-
.../raft/jraft/entity/codec/v1/V1EncoderTest.java | 13 ++++
7 files changed, 138 insertions(+), 51 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index 1e0e191b433..90c10f15488 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -36,12 +36,18 @@ class SegmentPayload {
private final long groupId;
- private final byte[] payload;
+ private final int payloadSize;
+
+ private final LogEntry logEntry;
+
+ private final LogEntryEncoder logEntryEncoder;
SegmentPayload(long groupId, LogEntry logEntry, LogEntryEncoder
logEntryEncoder) {
this.groupId = groupId;
- // TODO: optimize, see
https://issues.apache.org/jira/browse/IGNITE-26419
- this.payload = logEntryEncoder.encode(logEntry);
+ this.logEntry = logEntry;
+ this.logEntryEncoder = logEntryEncoder;
+
+ payloadSize = logEntryEncoder.size(logEntry);
}
void writeTo(ByteBuffer buffer) {
@@ -49,8 +55,9 @@ class SegmentPayload {
buffer
.putLong(groupId)
- .putInt(payload.length)
- .put(payload);
+ .putInt(payloadSize);
+
+ logEntryEncoder.encode(buffer, logEntry);
int dataSize = buffer.position() - originalPos;
@@ -64,7 +71,7 @@ class SegmentPayload {
}
int size() {
- return overheadSize() + payload.length;
+ return overheadSize() + payloadSize;
}
static int overheadSize() {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryEncoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryEncoder.java
index 1759b0cbb08..9a002448d0e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryEncoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryEncoder.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.entity.codec;
+import java.nio.ByteBuffer;
import org.apache.ignite.raft.jraft.entity.LogEntry;
/**
@@ -29,4 +30,17 @@ public interface LogEntryEncoder {
* @return encoded byte array
*/
byte[] encode(LogEntry log);
+
+ /**
+ * Same as {@link #encode(LogEntry)} but writes the result into a buffer.
+ *
+ * @param buffer Buffer to write into. The buffer must be large enough to
hold the encoded log entry.
+ * @param log Log entry.
+ */
+ void encode(ByteBuffer buffer, LogEntry log);
+
+ /**
+ * Returns the size in bytes of an encoded log entry.
+ */
+ int size(LogEntry logEntry);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
index 7a1840eb266..f28263c6750 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.entity.codec.v1;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
@@ -42,6 +43,7 @@ public final class V1Encoder implements LogEntryEncoder {
*
* @param logEntry Log entry.
*/
+ @Override
public int size(LogEntry logEntry) {
EntryType type = logEntry.getType();
LogId id = logEntry.getId();
@@ -125,46 +127,50 @@ public final class V1Encoder implements LogEntryEncoder {
// Refactored to look closer to Ignites code style.
@Override
public byte[] encode(final LogEntry log) {
+ int totalLen = size(log);
+
+ ByteBuffer buffer =
ByteBuffer.allocate(totalLen).order(ByteOrder.LITTLE_ENDIAN);
+
+ encode(buffer, log);
+
+ return buffer.array();
+ }
+
+ @Override
+ public void encode(ByteBuffer buffer, LogEntry log) {
EntryType type = log.getType();
LogId id = log.getId();
List<PeerId> peers = log.getPeers();
List<PeerId> oldPeers = log.getOldPeers();
List<PeerId> learners = log.getLearners();
List<PeerId> oldLearners = log.getOldLearners();
- ByteBuffer data = log.getData();
+ ByteBuffer data = log.getReadOnlyData();
int typeNumber = type.getNumber();
long index = id.getIndex();
long term = id.getTerm();
- int totalLen = size(log);
-
- byte[] content = new byte[totalLen];
- content[0] = LogEntryV1CodecFactory.MAGIC;
- int pos = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
+ buffer.put(LogEntryV1CodecFactory.MAGIC);
- pos = writeLong(typeNumber, content, pos);
- pos = writeLong(index, content, pos);
- pos = writeLong(term, content, pos);
+ writeLong(typeNumber, buffer);
+ writeLong(index, buffer);
+ writeLong(term, buffer);
- Bits.putLongLittleEndian(content, pos, log.getChecksum());
- pos += Long.BYTES;
+ buffer.putLong(log.getChecksum());
if (type != EntryType.ENTRY_TYPE_DATA) {
- pos = writeNodesList(pos, content, peers);
+ writeNodesList(buffer, peers);
- pos = writeNodesList(pos, content, oldPeers);
+ writeNodesList(buffer, oldPeers);
- pos = writeNodesList(pos, content, learners);
+ writeNodesList(buffer, learners);
- pos = writeNodesList(pos, content, oldLearners);
+ writeNodesList(buffer, oldLearners);
}
if (type != EntryType.ENTRY_TYPE_CONFIGURATION && data != null) {
- System.arraycopy(data.array(), data.position(), content, pos,
data.remaining());
+ buffer.put(data);
}
-
- return content;
}
private static int nodesListSizeInBytes(@Nullable List<PeerId> nodes) {
@@ -210,45 +216,39 @@ public final class V1Encoder implements LogEntryEncoder {
return addr;
}
- private static int writeNodesList(int pos, byte[] content, List<PeerId>
nodeStrs) {
+ private static void writeNodesList(ByteBuffer content, List<PeerId>
nodeStrs) {
if (nodeStrs == null) {
- content[pos] = 0;
+ content.put((byte) 0);
- return pos + 1;
+ return;
}
- pos = writeLong(nodeStrs.size(), content, pos);
+ writeLong(nodeStrs.size(), content);
for (PeerId peerId : nodeStrs) {
String consistentId = peerId.getConsistentId();
int length = consistentId.length();
- Bits.putShortLittleEndian(content, pos, (short) length);
- pos += Short.BYTES;
+ content.putShort((short) length);
- AsciiStringUtil.unsafeEncode(consistentId, content, pos);
- pos += length;
+ AsciiStringUtil.unsafeEncode(consistentId, content);
- pos = writeLong(peerId.getIdx(), content, pos);
- pos = writeLong(peerId.getPriority() + 1, content, pos);
+ writeLong(peerId.getIdx(), content);
+ writeLong(peerId.getPriority() + 1, content);
}
-
- return pos;
}
// Based on DirectByteBufferStreamImplV1.
- private static int writeLong(long val, byte[] out, int pos) {
+ private static void writeLong(long val, ByteBuffer out) {
while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
byte b = (byte) (val | 0x80);
- out[pos++] = b;
+ out.put(b);
val >>>= 7;
}
- out[pos++] = (byte) val;
-
- return pos;
+ out.put((byte) val);
}
private static long writeLong(long val, long addr) {
@@ -266,7 +266,7 @@ public final class V1Encoder implements LogEntryEncoder {
}
/**
- * Returns the number of bytes, required by the {@link #writeLong(long,
byte[], int)} to write the value.
+ * Returns the number of bytes, required by the {@link #writeLong} to
write the value.
*/
private static int sizeInBytes(long val) {
if (val >= 0) {
@@ -293,4 +293,4 @@ public final class V1Encoder implements LogEntryEncoder {
return 10;
}
}
-}
\ No newline at end of file
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
index 0fb8134fd1a..0156f54f9b3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
@@ -16,19 +16,18 @@
*/
package org.apache.ignite.raft.jraft.util;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
*
*/
public final class AsciiStringUtil {
-
- public static byte[] unsafeEncode(final CharSequence in, byte[] out, int
offset) {
+ public static void unsafeEncode(final CharSequence in, ByteBuffer out) {
final int len = in.length();
for (int i = 0; i < len; i++) {
- out[i + offset] = (byte) in.charAt(i);
+ out.put((byte) in.charAt(i));
}
- return out;
}
public static byte[] unsafeEncode(final CharSequence in) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 1aa2e544d77..8b22e2c5ccb 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -41,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -59,6 +60,7 @@ import
org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
@@ -355,7 +357,22 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
entry.setId(new LogId(index, 0));
- fileManager.appendEntry(groupId, entry, e -> serializedEntry);
+ fileManager.appendEntry(groupId, entry, new LogEntryEncoder() {
+ @Override
+ public byte[] encode(LogEntry log) {
+ return serializedEntry;
+ }
+
+ @Override
+ public void encode(ByteBuffer buffer, LogEntry log) {
+ buffer.put(serializedEntry);
+ }
+
+ @Override
+ public int size(LogEntry logEntry) {
+ return serializedEntry.length;
+ }
+ });
}
private static void validateEntry(byte[] entry, byte[] expectedPayload) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
index 892bc0a0789..69253b8af85 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java
@@ -26,10 +26,12 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -102,7 +104,15 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
void testAppendEntry() throws IOException {
byte[] payload = {1, 2, 3, 4, 5};
- when(encoder.encode(any())).thenReturn(payload);
+ doAnswer(invocation -> {
+ ByteBuffer buffer = invocation.getArgument(0);
+
+ buffer.put(payload);
+
+ return null;
+ }).when(encoder).encode(any(), any());
+
+ when(encoder.size(any())).thenAnswer(invocation -> payload.length);
logStorage.appendEntry(new LogEntry());
@@ -125,9 +135,36 @@ class SegstoreLogStorageTest extends IgniteAbstractTest {
void testAppendEntries() throws IOException {
List<byte[]> payloads = generateRandomData();
- Iterator<byte[]> payloadsIterator = payloads.iterator();
+ var iteratorEncoder = new LogEntryEncoder() {
+ private final Iterator<byte[]> payloadsIterator =
payloads.iterator();
+
+ private byte[] nextPayload;
+
+ @Override
+ public byte[] encode(LogEntry log) {
+ return fail("Should not be called.");
+ }
+
+ @Override
+ public void encode(ByteBuffer buffer, LogEntry log) {
+ buffer.put(nextPayload);
+ }
+
+ @Override
+ public int size(LogEntry logEntry) {
+ nextPayload = payloadsIterator.next();
+
+ return nextPayload.length;
+ }
+ };
+
+ doAnswer(invocation -> {
+ iteratorEncoder.encode(invocation.getArgument(0),
invocation.getArgument(1));
+
+ return null;
+ }).when(encoder).encode(any(), any());
- when(encoder.encode(any())).thenAnswer(invocation ->
payloadsIterator.next());
+ when(encoder.size(any())).thenAnswer(invocation ->
iteratorEncoder.size(invocation.getArgument(0)));
List<LogEntry> entries = IntStream.range(0, payloads.size())
.mapToObj(i -> {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
index 0a20b5ebff6..eb5228be9d2 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.raft.jraft.entity.codec.v1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -48,6 +49,18 @@ class V1EncoderTest {
assertEquals(logEntry, decodedEntry);
}
+ @ParameterizedTest
+ @FieldSource("ENTRIES")
+ void testEncodeDecodeByteBuffer(LogEntry logEntry) {
+ ByteBuffer buf =
ByteBuffer.allocate(V1Encoder.INSTANCE.size(logEntry)).order(ByteOrder.LITTLE_ENDIAN);
+
+ V1Encoder.INSTANCE.encode(buf, logEntry);
+
+ LogEntry decodedEntry = V1Decoder.INSTANCE.decode(buf.array());
+
+ assertEquals(logEntry, decodedEntry);
+ }
+
@ParameterizedTest
@FieldSource("ENTRIES")
void testSize(LogEntry logEntry) {