This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 75892a78be Improve primary key serialization performance (#9538)
75892a78be is described below

commit 75892a78be165a656c0a13b460cfb9e8aacc6c1a
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Oct 7 14:29:47 2022 +0530

    Improve primary key serialization performance (#9538)
    
    * Improve primary key serialization performance
    
    * Remove unused import
    
    * Add datasize to serialized value
    
    * Add test for primary key serialization and handle nulls
    
    * fix test
    
    * Add a seperate serializer for single-valued arrays
    
    Co-authored-by: Kartik Khare <[email protected]>
---
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  8 +-
 .../apache/pinot/spi/data/readers/PrimaryKey.java  | 86 +++++++++++++++++++++-
 .../pinot/spi/data/readers/PrimaryKeyTest.java     | 49 +++++++++++-
 3 files changed, 133 insertions(+), 10 deletions(-)

diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 3df1f45caa..f005038608 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -335,15 +335,15 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   public void testHashPrimaryKey() {
     PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
     assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
-        "58de44997505014e02982846a4d1cbbd");
+        "6ca926be8c2d1d980acf48ba48418e24");
     assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
-        "7e6b4a98296292a4012225fff037fa8c");
+        "e4540494e43b27e312d01f33208c6a4e");
     // reorder
     pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"});
     assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MD5)).getBytes()),
-        "d2df12c6dea7b83f965613614eee58e2");
+        "fc2159b78d07f803fdfb0b727315a445");
     assertEquals(BytesUtils.toHexString(((ByteArray) 
HashUtils.hashPrimaryKey(pk, HashFunction.MURMUR3)).getBytes()),
-        "8d68b314cc0c8de4dbd55f4dad3c3e66");
+        "37fab5ef0ea39711feabcdc623cb8a4e");
   }
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
index b663cf6b72..27475a3e29 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pinot.spi.data.readers;
 
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
@@ -37,7 +41,85 @@ public class PrimaryKey {
   }
 
   public byte[] asBytes() {
-    return SerializationUtils.serialize(_values);
+    if (_values.length == 1) {
+      return asBytesSingleVal(_values[0]);
+    }
+
+    int sizeInBytes = 0;
+    byte[][] cache = new byte[_values.length][];
+    for (int i = 0; i < _values.length; i++) {
+      Object value = _values[i];
+
+      if (value instanceof Integer) {
+        sizeInBytes += Integer.BYTES;
+      } else if (value instanceof Long) {
+        sizeInBytes += Long.BYTES;
+      } else if (value instanceof String) {
+        cache[i] = ((String) value).getBytes(StandardCharsets.UTF_8);
+        sizeInBytes += cache[i].length + Integer.BYTES;
+      } else if (value instanceof ByteArray) {
+        cache[i] = ((ByteArray) value).getBytes();
+        sizeInBytes += cache[i].length + Integer.BYTES;
+      } else if (value instanceof Float) {
+        sizeInBytes += Float.BYTES;
+      } else if (value instanceof Double) {
+        sizeInBytes += Double.BYTES;
+      } else if (value instanceof BigDecimal) {
+        cache[i] = BigDecimalUtils.serialize((BigDecimal) value);
+        sizeInBytes += cache[i].length + Integer.BYTES;
+      } else {
+        throw new IllegalStateException(
+            String.format("Unsupported value: %s of type: %s", value, value != 
null ? value.getClass() : null));
+      }
+    }
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(sizeInBytes);
+    for (int i = 0; i < _values.length; i++) {
+      Object value = _values[i];
+
+      if (value instanceof Integer) {
+        byteBuffer.putInt((Integer) value);
+      } else if (value instanceof Long) {
+        byteBuffer.putLong((Long) value);
+      } else if (value instanceof Float) {
+        byteBuffer.putFloat((Float) value);
+      } else if (value instanceof Double) {
+        byteBuffer.putDouble((Double) value);
+      } else {
+        byteBuffer.putInt(cache[i].length);
+        byteBuffer.put(cache[i]);
+      }
+    }
+    return byteBuffer.array();
+  }
+
+  private byte[] asBytesSingleVal(Object value) {
+    if (value instanceof Integer) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
+      byteBuffer.putInt((Integer) value);
+      return byteBuffer.array();
+    } else if (value instanceof Long) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES);
+      byteBuffer.putLong((Long) value);
+      return byteBuffer.array();
+    } else if (value instanceof String) {
+      return ((String) value).getBytes(StandardCharsets.UTF_8);
+    } else if (value instanceof ByteArray) {
+      return ((ByteArray) value).getBytes();
+    } else if (value instanceof Float) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES);
+      byteBuffer.putFloat((Float) value);
+      return byteBuffer.array();
+    } else if (value instanceof Double) {
+      ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES);
+      byteBuffer.putDouble((Double) value);
+      return byteBuffer.array();
+    } else if (value instanceof BigDecimal) {
+      return BigDecimalUtils.serialize((BigDecimal) value);
+    } else {
+      throw new IllegalStateException(
+          String.format("Unsupported value: %s of type: %s", value, value != 
null ? value.getClass() : null));
+    }
   }
 
   @Override
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
index e1c531d705..e92a3e2ed5 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.spi.data.readers;
 
-import org.apache.commons.lang3.SerializationUtils;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.testng.annotations.Test;
 
@@ -43,9 +46,47 @@ public class PrimaryKeyTest {
   @Test
   public void testSerialization() {
     byte[] rawbytes = {0xa, 0x2, (byte) 0xff};
-    PrimaryKey pk = new PrimaryKey(new Object[]{"111", 2, new 
ByteArray(rawbytes)});
+    Object[] values = new Object[]{
+        "foo_bar", 2, 2.0d, 3.14f, System.currentTimeMillis(), new 
ByteArray(rawbytes), new BigDecimal(100)
+    };
+    PrimaryKey pk = new PrimaryKey(values);
     byte[] bytes = pk.asBytes();
-    PrimaryKey deserialized = new PrimaryKey((Object[]) 
SerializationUtils.deserialize(bytes));
-    assertEquals(deserialized, pk);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+    int length = byteBuffer.getInt();
+    assertEquals(length, ((String) values[0]).length());
+    byte[] arr = new byte[length];
+    byteBuffer.get(arr);
+    String out = new String(arr, StandardCharsets.UTF_8);
+    assertEquals(out, values[0]);
+
+    assertEquals(byteBuffer.getInt(), values[1]);
+    assertEquals(byteBuffer.getDouble(), values[2]);
+    assertEquals(byteBuffer.getFloat(), values[3]);
+    assertEquals(byteBuffer.getLong(), values[4]);
+
+    assertEquals(byteBuffer.getInt(), rawbytes.length);
+    arr = new byte[rawbytes.length];
+    byteBuffer.get(arr);
+    assertEquals(arr, rawbytes);
+
+    length = byteBuffer.getInt();
+    arr = new byte[length];
+    byteBuffer.get(arr);
+    assertEquals(BigDecimalUtils.deserialize(arr), values[6]);
+  }
+
+  @Test
+  public void testSerializationSingleVal() {
+    Object[] values = new Object[]{
+        "foo_bar"
+    };
+    PrimaryKey pk = new PrimaryKey(values);
+    byte[] bytes = pk.asBytes();
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    byte[] arr = new byte[7];
+    byteBuffer.get(arr);
+    String out = new String(arr, StandardCharsets.UTF_8);
+    assertEquals(out, values[0]);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to