[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185820299


##
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+@Test
+public void testCorrectnessWriteUnsignedVarlong() {
+// The old well-known implementation for writeVarlong.
+LongFunction simpleImplementation = (long value) -> {
+ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+while ((value & 0xff80L) != 0L) {
+byte b = (byte) ((value & 0x7f) | 0x80);
+buffer.put(b);
+value >>>= 7;
+}
+buffer.put((byte) value);
+
+return buffer;
+};
+
+// compare the full range of values
+final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+for (long i = 1; i < Long.MAX_VALUE && i >= 0; i = i << 1) {
+ByteUtils.writeUnsignedVarlong(i, actual);
+final ByteBuffer expected = simpleImplementation.apply(i);
+assertArrayEquals(expected.array(), actual.array(), 
"Implementations do not match for number=" + i);
+actual.clear();
+}
+}
+
+@Test
+public void testCorrectnessWriteUnsignedVarint() {
+// The old well-known implementation for writeUnsignedVarint.
+IntFunction simpleImplementation = (int value) -> {
+ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+while (true) {
+if ((value & ~0x7F) == 0) {
+buffer.put((byte) value);
+break;
+} else {
+buffer.put((byte) ((value & 0x7F) | 0x80));
+value >>>= 7;
+}
+}
+
+return buffer;
+};
+
+// compare the full range of values
+final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+ByteUtils.writeUnsignedVarint(i, actual);
+final ByteBuffer expected = simpleImplementation.apply(i);
+assertArrayEquals(expected.array(), actual.array(), 
"Implementations do not match for integer=" + i);
+actual.clear();
+}
+}
+
+@Test
+public void testCorrectnessReadUnsignedVarint() {
+// The old well-known implementation for readUnsignedVarint
+Function simpleImplementation = (ByteBuffer 
buffer) -> {
+int value = 0;
+int i = 0;
+int b;
+while (((b = buffer.get()) & 0x80) != 0) {
+value |= (b & 0x7f) << i;
+i += 7;
+if (i > 28)
+throw new IllegalArgumentException("Invalid varint");
+}
+value |= b << i;
+return value;
+};
+
+// compare the full range of values
+final ByteBuffer testData = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+ByteUtils.writeUnsignedVarint(i, testData);
+// prepare buffer for reading
+testData.flip();
+final int actual = 
ByteUtils.readUnsignedVarint(testData.duplicate());
+final int expected = simpleImplementation.apply(testData);
+assertEquals(expected, actual);
+testData.clear();
+}
+}
+
+@Test
+public void testCorrectnessReadUnsignedVarlong() {

Review Comment:
   done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-05 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185820116


##
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+@Test
+public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   Done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-05-02 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1182456902


##
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
 assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0L);
 }
 
+@Test
+public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   I would be inclined to keep this test to catch any potential bugs in the 
future changes to the `WriteUnsignedVarlong` implementation. This test 
increases the testing coverage over a larger range of values than what the 
existing tests check for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-04-11 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1162915231


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -227,17 +268,73 @@ public static int readVarint(DataInput in) throws 
IOException {
  * @throws IOException  if {@link DataInput} throws {@link 
IOException}
  */
 public static long readVarlong(DataInput in) throws IOException {
-long value = 0L;
-int i = 0;
-long b;
-while (((b = in.readByte()) & 0x80) != 0) {
-value |= (b & 0x7f) << i;
-i += 7;
-if (i > 63)
-throw illegalVarlongException(value);
+long raw = readUnsignedVarlong(in);
+return (raw >>> 1) ^ -(raw & 1);
+}
+
+/**
+ * For implementation details see {@link #readUnsignedVarlong(ByteBuffer)}

Review Comment:
   done in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-04-11 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1162901912


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java:
##
@@ -17,68 +17,241 @@
 
 package org.apache.kafka.jmh.util;
 
-import java.util.concurrent.ThreadLocalRandom;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.utils.ByteUtils;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.CompilerControl;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
-@State(Scope.Benchmark)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@OutputTimeUnit(TimeUnit.SECONDS)
 @Fork(3)
-@Warmup(iterations = 5, time = 1)
-@Measurement(iterations = 10, time = 1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
 public class ByteUtilsBenchmark {

Review Comment:
   Done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-04-04 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1157177692


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws 
IOException {
  * @throws IOException  if {@link DataInput} throws {@link 
IOException}
  */
 public static long readVarlong(DataInput in) throws IOException {
-long value = 0L;
-int i = 0;
-long b;
-while (((b = in.readByte()) & 0x80) != 0) {
-value |= (b & 0x7f) << i;
-i += 7;
-if (i > 63)
-throw illegalVarlongException(value);
+long raw = readUnsignedVarlong(in);
+return (raw >>> 1) ^ -(raw & 1);
+}
+
+private static long readUnsignedVarlong(DataInput in) throws IOException {
+byte tmp = in.readByte();

Review Comment:
   Good idea. I have added the benchmark for different sizes of varInt and 
varLong as per your recommendation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120354148


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) {
  * @param buffer The output to write to
  */
 public static void writeUnsignedVarint(int value, ByteBuffer buffer) {

Review Comment:
   Netty uses the default loop based implementation: 
https://github.com/netty/netty/blob/5d1f99655918c9c034ca090d51b64eced73f742f/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java#L58
 
   
   I wasn't able to find a project which does what we are doing [except for the 
blog 
post](https://steinborn.me/posts/performance/how-fast-can-you-write-a-varint/) 
I mentioned above and over 
[here](https://github.com/richardstartin/varints/blob/main/src/main/java/io/github/richardstartin/varints/SmartNoDataDependencyVarIntState.java).
   
   Note that protobuf uses unrolled implementation for it's c++ code at 
   
https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.h#L913
 Not that it matters for us since compilers are different from Java & c++ but 
adding it here as a data point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120328865


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws 
IOException {
  * @throws IOException  if {@link DataInput} throws {@link 
IOException}
  */
 public static long readVarlong(DataInput in) throws IOException {
-long value = 0L;
-int i = 0;
-long b;
-while (((b = in.readByte()) & 0x80) != 0) {
-value |= (b & 0x7f) << i;
-i += 7;
-if (i > 63)
-throw illegalVarlongException(value);
+long raw = readUnsignedVarlong(in);
+return (raw >>> 1) ^ -(raw & 1);
+}
+
+private static long readUnsignedVarlong(DataInput in) throws IOException {
+byte tmp = in.readByte();

Review Comment:
   No, I have written this by extending Netty's varint32 implementation to work 
with 64 bits (it's simple loop unrolling, no fancy logic). The heuristics of 
inlining starts becoming unclearer as we increase the size of function, hence, 
we don't see much benefit for 64 bit implementation here. I don't have a strong 
opinion on 64 bit implementation here and would be happy to fall back to exact 
implementation as Protobuf. [1]
   
   
   [1] 
https://github.com/protocolbuffers/protobuf/blob/main/java/core/src/main/java/com/google/protobuf/CodedInputStream.java#L1048



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-28 Thread via GitHub


divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120323557


##
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##
@@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int 
offset, int value) {
  * @throws IllegalArgumentException if variable-length value does not 
terminate after 5 bytes have been read
  */
 public static int readUnsignedVarint(ByteBuffer buffer) {

Review Comment:
   Sure, will do. 
   
   Also protobuf has a similar unrolled implementation for its c++ code at 
https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.cc#L422
 (it's Java variant doesn't use the unrolled implementation since they decided 
to add a different implementation which favours cases when varints are 1 byte).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org