This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 75892a78be Improve primary key serialization performance (#9538)
75892a78be is described below
commit 75892a78be165a656c0a13b460cfb9e8aacc6c1a
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Oct 7 14:29:47 2022 +0530
Improve primary key serialization performance (#9538)
* Improve primary key serialization performance
* Remove unused import
* Add datasize to serialized value
* Add test for primary key serialization and handle nulls
* fix test
* Add a seperate serializer for single-valued arrays
Co-authored-by: Kartik Khare <[email protected]>
---
...rrentMapPartitionUpsertMetadataManagerTest.java | 8 +-
.../apache/pinot/spi/data/readers/PrimaryKey.java | 86 +++++++++++++++++++++-
.../pinot/spi/data/readers/PrimaryKeyTest.java | 49 +++++++++++-
3 files changed, 133 insertions(+), 10 deletions(-)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 3df1f45caa..f005038608 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -335,15 +335,15 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
public void testHashPrimaryKey() {
PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
- "58de44997505014e02982846a4d1cbbd");
+ "6ca926be8c2d1d980acf48ba48418e24");
assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
- "7e6b4a98296292a4012225fff037fa8c");
+ "e4540494e43b27e312d01f33208c6a4e");
// reorder
pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"});
assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
- "d2df12c6dea7b83f965613614eee58e2");
+ "fc2159b78d07f803fdfb0b727315a445");
assertEquals(BytesUtils.toHexString(((ByteArray)
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
- "8d68b314cc0c8de4dbd55f4dad3c3e66");
+ "37fab5ef0ea39711feabcdc623cb8a4e");
}
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
index b663cf6b72..27475a3e29 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
@@ -18,8 +18,12 @@
*/
package org.apache.pinot.spi.data.readers;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
/**
@@ -37,7 +41,85 @@ public class PrimaryKey {
}
public byte[] asBytes() {
- return SerializationUtils.serialize(_values);
+ if (_values.length == 1) {
+ return asBytesSingleVal(_values[0]);
+ }
+
+ int sizeInBytes = 0;
+ byte[][] cache = new byte[_values.length][];
+ for (int i = 0; i < _values.length; i++) {
+ Object value = _values[i];
+
+ if (value instanceof Integer) {
+ sizeInBytes += Integer.BYTES;
+ } else if (value instanceof Long) {
+ sizeInBytes += Long.BYTES;
+ } else if (value instanceof String) {
+ cache[i] = ((String) value).getBytes(StandardCharsets.UTF_8);
+ sizeInBytes += cache[i].length + Integer.BYTES;
+ } else if (value instanceof ByteArray) {
+ cache[i] = ((ByteArray) value).getBytes();
+ sizeInBytes += cache[i].length + Integer.BYTES;
+ } else if (value instanceof Float) {
+ sizeInBytes += Float.BYTES;
+ } else if (value instanceof Double) {
+ sizeInBytes += Double.BYTES;
+ } else if (value instanceof BigDecimal) {
+ cache[i] = BigDecimalUtils.serialize((BigDecimal) value);
+ sizeInBytes += cache[i].length + Integer.BYTES;
+ } else {
+ throw new IllegalStateException(
+ String.format("Unsupported value: %s of type: %s", value, value !=
null ? value.getClass() : null));
+ }
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(sizeInBytes);
+ for (int i = 0; i < _values.length; i++) {
+ Object value = _values[i];
+
+ if (value instanceof Integer) {
+ byteBuffer.putInt((Integer) value);
+ } else if (value instanceof Long) {
+ byteBuffer.putLong((Long) value);
+ } else if (value instanceof Float) {
+ byteBuffer.putFloat((Float) value);
+ } else if (value instanceof Double) {
+ byteBuffer.putDouble((Double) value);
+ } else {
+ byteBuffer.putInt(cache[i].length);
+ byteBuffer.put(cache[i]);
+ }
+ }
+ return byteBuffer.array();
+ }
+
+ private byte[] asBytesSingleVal(Object value) {
+ if (value instanceof Integer) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
+ byteBuffer.putInt((Integer) value);
+ return byteBuffer.array();
+ } else if (value instanceof Long) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES);
+ byteBuffer.putLong((Long) value);
+ return byteBuffer.array();
+ } else if (value instanceof String) {
+ return ((String) value).getBytes(StandardCharsets.UTF_8);
+ } else if (value instanceof ByteArray) {
+ return ((ByteArray) value).getBytes();
+ } else if (value instanceof Float) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES);
+ byteBuffer.putFloat((Float) value);
+ return byteBuffer.array();
+ } else if (value instanceof Double) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES);
+ byteBuffer.putDouble((Double) value);
+ return byteBuffer.array();
+ } else if (value instanceof BigDecimal) {
+ return BigDecimalUtils.serialize((BigDecimal) value);
+ } else {
+ throw new IllegalStateException(
+ String.format("Unsupported value: %s of type: %s", value, value !=
null ? value.getClass() : null));
+ }
}
@Override
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
index e1c531d705..e92a3e2ed5 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
@@ -18,7 +18,10 @@
*/
package org.apache.pinot.spi.data.readers;
-import org.apache.commons.lang3.SerializationUtils;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.testng.annotations.Test;
@@ -43,9 +46,47 @@ public class PrimaryKeyTest {
@Test
public void testSerialization() {
byte[] rawbytes = {0xa, 0x2, (byte) 0xff};
- PrimaryKey pk = new PrimaryKey(new Object[]{"111", 2, new
ByteArray(rawbytes)});
+ Object[] values = new Object[]{
+ "foo_bar", 2, 2.0d, 3.14f, System.currentTimeMillis(), new
ByteArray(rawbytes), new BigDecimal(100)
+ };
+ PrimaryKey pk = new PrimaryKey(values);
byte[] bytes = pk.asBytes();
- PrimaryKey deserialized = new PrimaryKey((Object[])
SerializationUtils.deserialize(bytes));
- assertEquals(deserialized, pk);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+ int length = byteBuffer.getInt();
+ assertEquals(length, ((String) values[0]).length());
+ byte[] arr = new byte[length];
+ byteBuffer.get(arr);
+ String out = new String(arr, StandardCharsets.UTF_8);
+ assertEquals(out, values[0]);
+
+ assertEquals(byteBuffer.getInt(), values[1]);
+ assertEquals(byteBuffer.getDouble(), values[2]);
+ assertEquals(byteBuffer.getFloat(), values[3]);
+ assertEquals(byteBuffer.getLong(), values[4]);
+
+ assertEquals(byteBuffer.getInt(), rawbytes.length);
+ arr = new byte[rawbytes.length];
+ byteBuffer.get(arr);
+ assertEquals(arr, rawbytes);
+
+ length = byteBuffer.getInt();
+ arr = new byte[length];
+ byteBuffer.get(arr);
+ assertEquals(BigDecimalUtils.deserialize(arr), values[6]);
+ }
+
+ @Test
+ public void testSerializationSingleVal() {
+ Object[] values = new Object[]{
+ "foo_bar"
+ };
+ PrimaryKey pk = new PrimaryKey(values);
+ byte[] bytes = pk.asBytes();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byte[] arr = new byte[7];
+ byteBuffer.get(arr);
+ String out = new String(arr, StandardCharsets.UTF_8);
+ assertEquals(out, values[0]);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]