This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c9234130c Fix PulsarUtils to not share buffer (#12671)
8c9234130c is described below

commit 8c9234130c938a162390b051e581e5ed849c19c6
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Mar 19 12:56:12 2024 -0700

    Fix PulsarUtils to not share buffer (#12671)
---
 .../pinot/plugin/stream/pulsar/PulsarUtils.java    | 56 ++++++++--------------
 1 file changed, 19 insertions(+), 37 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index d22f8b0b5f..772357aa6b 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -18,77 +18,59 @@
  */
 package org.apache.pinot.plugin.stream.pulsar;
 
-import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class PulsarUtils {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarUtils.class);
-
-  private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
-
   private PulsarUtils() {
   }
 
-  public static SubscriptionInitialPosition 
offsetCriteriaToSubscription(OffsetCriteria offsetCriteria)
-      throws IllegalArgumentException {
+  public static SubscriptionInitialPosition 
offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) {
     if (offsetCriteria.isLargest()) {
       return SubscriptionInitialPosition.Latest;
     }
     if (offsetCriteria.isSmallest()) {
       return SubscriptionInitialPosition.Earliest;
     }
-
-    throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
+    throw new IllegalArgumentException("Unsupported offset criteria: " + 
offsetCriteria);
   }
 
-  public static MessageId offsetCriteriaToMessageId(OffsetCriteria 
offsetCriteria)
-      throws IllegalArgumentException {
+  public static MessageId offsetCriteriaToMessageId(OffsetCriteria 
offsetCriteria) {
     if (offsetCriteria.isLargest()) {
       return MessageId.latest;
     }
     if (offsetCriteria.isSmallest()) {
       return MessageId.earliest;
     }
-
-    throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
+    throw new IllegalArgumentException("Unsupported offset criteria: " + 
offsetCriteria);
   }
 
   /**
    * Stitch key and value bytes together using a simple format:
    * 4 bytes for key length + key bytes + 4 bytes for value length + value 
bytes
    */
-  protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
-    int keyLen = keyBytes.length;
-    int valueLen = valueBytes.length;
-    int totalByteArrayLength = 8 + keyLen + valueLen;
-    try (ByteArrayOutputStream bos = new 
ByteArrayOutputStream(totalByteArrayLength)) {
-      LENGTH_BUF.clear();
-      bos.write(LENGTH_BUF.putInt(keyLen).array());
-      bos.write(keyBytes);
-      LENGTH_BUF.clear();
-      bos.write(LENGTH_BUF.putInt(valueLen).array());
-      bos.write(valueBytes);
-      return bos.toByteArray();
-    } catch (Exception e) {
-      LOGGER.error("Unable to stitch key and value bytes together", e);
-    }
-    return null;
+  public static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+    byte[] stitchedBytes = new byte[8 + keyBytes.length + valueBytes.length];
+    ByteBuffer buffer = ByteBuffer.wrap(stitchedBytes);
+    buffer.putInt(keyBytes.length);
+    buffer.put(keyBytes);
+    buffer.putInt(valueBytes.length);
+    buffer.put(valueBytes);
+    return stitchedBytes;
   }
 
-  protected static PulsarStreamMessage 
buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
+  public static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]> 
message, boolean enableKeyValueStitch,
       PulsarMetadataExtractor pulsarMetadataExtractor) {
     byte[] key = message.getKeyBytes();
-    byte[] data = enableKeyValueStitch ? stitchKeyValue(key, 
message.getData()) : message.getData();
-    int dataLength = (data != null) ? data.length : 0;
-    return new PulsarStreamMessage(key, data, message.getMessageId(),
-        (PulsarStreamMessageMetadata) 
pulsarMetadataExtractor.extract(message), dataLength);
+    byte[] value = message.getData();
+    if (enableKeyValueStitch) {
+      value = stitchKeyValue(key, value);
+    }
+    return new PulsarStreamMessage(key, value, message.getMessageId(),
+        (PulsarStreamMessageMetadata) 
pulsarMetadataExtractor.extract(message), value.length);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to