This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8c9234130c Fix PulsarUtils to not share buffer (#12671)
8c9234130c is described below
commit 8c9234130c938a162390b051e581e5ed849c19c6
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 19 12:56:12 2024 -0700
Fix PulsarUtils to not share buffer (#12671)
---
.../pinot/plugin/stream/pulsar/PulsarUtils.java | 56 ++++++++--------------
1 file changed, 19 insertions(+), 37 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index d22f8b0b5f..772357aa6b 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -18,77 +18,59 @@
*/
package org.apache.pinot.plugin.stream.pulsar;
-import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class PulsarUtils {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarUtils.class);
-
- private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
-
private PulsarUtils() {
}
- public static SubscriptionInitialPosition
offsetCriteriaToSubscription(OffsetCriteria offsetCriteria)
- throws IllegalArgumentException {
+ public static SubscriptionInitialPosition
offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) {
if (offsetCriteria.isLargest()) {
return SubscriptionInitialPosition.Latest;
}
if (offsetCriteria.isSmallest()) {
return SubscriptionInitialPosition.Earliest;
}
-
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unsupported offset criteria: " +
offsetCriteria);
}
- public static MessageId offsetCriteriaToMessageId(OffsetCriteria
offsetCriteria)
- throws IllegalArgumentException {
+ public static MessageId offsetCriteriaToMessageId(OffsetCriteria
offsetCriteria) {
if (offsetCriteria.isLargest()) {
return MessageId.latest;
}
if (offsetCriteria.isSmallest()) {
return MessageId.earliest;
}
-
- throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
+ throw new IllegalArgumentException("Unsupported offset criteria: " +
offsetCriteria);
}
/**
* Stitch key and value bytes together using a simple format:
* 4 bytes for key length + key bytes + 4 bytes for value length + value
bytes
*/
- protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
- int keyLen = keyBytes.length;
- int valueLen = valueBytes.length;
- int totalByteArrayLength = 8 + keyLen + valueLen;
- try (ByteArrayOutputStream bos = new
ByteArrayOutputStream(totalByteArrayLength)) {
- LENGTH_BUF.clear();
- bos.write(LENGTH_BUF.putInt(keyLen).array());
- bos.write(keyBytes);
- LENGTH_BUF.clear();
- bos.write(LENGTH_BUF.putInt(valueLen).array());
- bos.write(valueBytes);
- return bos.toByteArray();
- } catch (Exception e) {
- LOGGER.error("Unable to stitch key and value bytes together", e);
- }
- return null;
+ public static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+ byte[] stitchedBytes = new byte[8 + keyBytes.length + valueBytes.length];
+ ByteBuffer buffer = ByteBuffer.wrap(stitchedBytes);
+ buffer.putInt(keyBytes.length);
+ buffer.put(keyBytes);
+ buffer.putInt(valueBytes.length);
+ buffer.put(valueBytes);
+ return stitchedBytes;
}
- protected static PulsarStreamMessage
buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
+ public static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]>
message, boolean enableKeyValueStitch,
PulsarMetadataExtractor pulsarMetadataExtractor) {
byte[] key = message.getKeyBytes();
- byte[] data = enableKeyValueStitch ? stitchKeyValue(key,
message.getData()) : message.getData();
- int dataLength = (data != null) ? data.length : 0;
- return new PulsarStreamMessage(key, data, message.getMessageId(),
- (PulsarStreamMessageMetadata)
pulsarMetadataExtractor.extract(message), dataLength);
+ byte[] value = message.getData();
+ if (enableKeyValueStitch) {
+ value = stitchKeyValue(key, value);
+ }
+ return new PulsarStreamMessage(key, value, message.getMessageId(),
+ (PulsarStreamMessageMetadata)
pulsarMetadataExtractor.extract(message), value.length);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]