This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe2b013a65 Extract record keys, headers and metadata from Pulsar
sources (#10995)
fe2b013a65 is described below
commit fe2b013a657e1ad6ac508a9a37933961bc4c408b
Author: Jeff Bolle <[email protected]>
AuthorDate: Fri Jul 21 20:44:33 2023 -0400
Extract record keys, headers and metadata from Pulsar sources (#10995)
---
.../pinot/plugin/stream/pulsar/PulsarConfig.java | 43 ++++-
.../plugin/stream/pulsar/PulsarMessageBatch.java | 51 +-----
.../stream/pulsar/PulsarMetadataExtractor.java | 182 +++++++++++++++++++++
.../PulsarPartitionLevelConnectionHandler.java | 3 +-
.../pulsar/PulsarPartitionLevelConsumer.java | 26 +--
.../stream/pulsar/PulsarStreamLevelConsumer.java | 4 +-
.../plugin/stream/pulsar/PulsarStreamMessage.java | 47 ++++++
.../stream/pulsar/PulsarStreamMessageMetadata.java | 76 +++++++++
.../pinot/plugin/stream/pulsar/PulsarUtils.java | 40 +++++
.../plugin/stream/pulsar/PulsarConfigTest.java | 118 +++++++++++++
.../plugin/stream/pulsar/PulsarConsumerTest.java | 34 ++--
.../stream/pulsar/PulsarMessageBatchTest.java | 36 ++--
.../stream/pulsar/PulsarMetadataExtractorTest.java | 92 +++++++++++
.../pinot/spi/stream/StreamDataDecoderImpl.java | 9 +-
14 files changed, 669 insertions(+), 92 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 73ea2eca4b..4094a93f06 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -19,7 +19,13 @@
package org.apache.pinot.plugin.stream.pulsar;
import com.google.common.base.Preconditions;
+import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -37,6 +43,7 @@ public class PulsarConfig {
public static final String AUTHENTICATION_TOKEN = "authenticationToken";
public static final String TLS_TRUST_CERTS_FILE_PATH =
"tlsTrustCertsFilePath";
public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
+ public static final String METADATA_FIELDS = "metadataFields"; //list of the
metadata fields comma separated
private final String _pulsarTopicName;
private final String _subscriberId;
@@ -45,8 +52,10 @@ public class PulsarConfig {
private final SubscriptionInitialPosition _subscriptionInitialPosition;
private final String _authenticationToken;
private final String _tlsTrustCertsFilePath;
+ @Deprecated(since = "v0.13.* since pulsar supports record key extraction")
private final boolean _enableKeyValueStitch;
-
+ private final boolean _populateMetadata;
+ private final Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
_metadataFields;
public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
_pulsarTopicName = streamConfig.getTopicName();
@@ -71,8 +80,32 @@ public class PulsarConfig {
_subscriptionInitialPosition =
PulsarUtils.offsetCriteriaToSubscription(offsetCriteria);
_initialMessageId = PulsarUtils.offsetCriteriaToMessageId(offsetCriteria);
+ _populateMetadata =
Boolean.parseBoolean(streamConfig.getStreamConfigsMap().getOrDefault(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.METADATA_POPULATE),
+ "false"));
+ String metadataFieldsToExtractCSV =
streamConfig.getStreamConfigsMap().getOrDefault(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
METADATA_FIELDS), "");
+ if (StringUtils.isBlank(metadataFieldsToExtractCSV) || !_populateMetadata)
{
+ _metadataFields = Collections.emptySet();
+ } else {
+ _metadataFields = parseConfigStringToEnumSet(metadataFieldsToExtractCSV);
+ }
}
+ private Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
parseConfigStringToEnumSet(
+ String listOfMetadataFields) {
+ try {
+ String[] metadataFieldsArr = listOfMetadataFields.split(",");
+ return Stream.of(metadataFieldsArr)
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank)
+
.map(PulsarStreamMessageMetadata.PulsarMessageMetadataValue::findByKey)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid metadata fields list: " +
listOfMetadataFields, e);
+ }
+ }
public String getPulsarTopicName() {
return _pulsarTopicName;
}
@@ -100,8 +133,14 @@ public class PulsarConfig {
public String getTlsTrustCertsFilePath() {
return _tlsTrustCertsFilePath;
}
-
public boolean getEnableKeyValueStitch() {
return _enableKeyValueStitch;
}
+ public boolean isPopulateMetadata() {
+ return _populateMetadata;
+ }
+
+ public Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
getMetadataFields() {
+ return _metadataFields;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
index 6df313b722..912e8bef23 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
@@ -21,16 +21,12 @@ package org.apache.pinot.plugin.stream.pulsar;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -39,13 +35,11 @@ import org.slf4j.LoggerFactory;
* plugins will not work. A custom decoder will be needed to unpack key and
value byte arrays and decode
* them independently.
*/
-public class PulsarMessageBatch implements MessageBatch<byte[]> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarMessageBatch.class);
- private final List<Message<byte[]>> _messageList = new ArrayList<>();
- private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
+public class PulsarMessageBatch implements MessageBatch<PulsarStreamMessage> {
+ private final List<PulsarStreamMessage> _messageList = new ArrayList<>();
private final boolean _enableKeyValueStitch;
- public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean
enableKeyValueStitch) {
+ public PulsarMessageBatch(Iterable<PulsarStreamMessage> iterable, boolean
enableKeyValueStitch) {
iterable.forEach(_messageList::add);
_enableKeyValueStitch = enableKeyValueStitch;
}
@@ -56,26 +50,19 @@ public class PulsarMessageBatch implements
MessageBatch<byte[]> {
}
@Override
- public byte[] getMessageAtIndex(int index) {
- Message<byte[]> msg = _messageList.get(index);
- if (_enableKeyValueStitch) {
- return stitchKeyValue(msg.getKeyBytes(), msg.getData());
- }
- return msg.getData();
+ public PulsarStreamMessage getMessageAtIndex(int index) {
+ return _messageList.get(index);
}
@Override
public int getMessageOffsetAtIndex(int index) {
- return ByteBuffer.wrap(_messageList.get(index).getData()).arrayOffset();
+ return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
}
@Override
public int getMessageLengthAtIndex(int index) {
- if (_enableKeyValueStitch) {
- Message<byte[]> msg = _messageList.get(index);
- return 8 + msg.getKeyBytes().length + msg.getData().length;
- }
- return _messageList.get(index).getData().length;
+ return _messageList.get(index).getValue().length; //if
_enableKeyValueStitch is true,
+ // then they are already stitched in the consumer. If false, then the
value is the raw value
}
/**
@@ -123,26 +110,4 @@ public class PulsarMessageBatch implements
MessageBatch<byte[]> {
public long getNextStreamMessageOffsetAtIndex(int index) {
throw new UnsupportedOperationException("Pulsar does not support long
stream offsets");
}
-
- /**
- * Stitch key and value bytes together using a simple format:
- * 4 bytes for key length + key bytes + 4 bytes for value length + value
bytes
- */
- private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
- int keyLen = keyBytes.length;
- int valueLen = valueBytes.length;
- int totalByteArrayLength = 8 + keyLen + valueLen;
- try (ByteArrayOutputStream bos = new
ByteArrayOutputStream(totalByteArrayLength)) {
- LENGTH_BUF.clear();
- bos.write(LENGTH_BUF.putInt(keyLen).array());
- bos.write(keyBytes);
- LENGTH_BUF.clear();
- bos.write(LENGTH_BUF.putInt(valueLen).array());
- bos.write(valueBytes);
- return bos.toByteArray();
- } catch (Exception e) {
- LOGGER.error("Unable to stitch key and value bytes together", e);
- }
- return null;
- }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
new file mode 100644
index 0000000000..c33c32e7cb
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pulsar.client.api.Message;
+
+public interface PulsarMetadataExtractor {
+ static PulsarMetadataExtractor build(boolean populateMetadata,
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
metadataValuesToExtract) {
+ return message -> {
+ long publishTime = message.getPublishTime();
+ long brokerPublishTime = message.getBrokerPublishTime().orElse(0L);
+ long recordTimestamp = brokerPublishTime != 0 ? brokerPublishTime :
publishTime;
+
+ Map<String, String> metadataMap = populateMetadataMap(populateMetadata,
message, metadataValuesToExtract);
+
+ GenericRow headerGenericRow = populateMetadata ?
buildGenericRow(message) : null;
+ return new PulsarStreamMessageMetadata(recordTimestamp,
headerGenericRow, metadataMap);
+ };
+ }
+
+ RowMetadata extract(Message<?> record);
+
+ static GenericRow buildGenericRow(Message<?> message) {
+ if (MapUtils.isEmpty(message.getProperties())) {
+ return null;
+ }
+ GenericRow genericRow = new GenericRow();
+ for (Map.Entry<String, String> entry : message.getProperties().entrySet())
{
+ genericRow.putValue(entry.getKey(), entry.getValue());
+ }
+ return genericRow;
+ }
+
+ static Map<String, String> populateMetadataMap(boolean populateAllFields,
Message<?> message,
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
metadataValuesToExtract) {
+
+ Map<String, String> metadataMap = new HashMap<>();
+
populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
message, metadataMap);
+
populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
message, metadataMap);
+
populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME,
message,
+ metadataMap);
+
populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
message, metadataMap);
+
+ // Populate some timestamps for lag calculation even if populateMetadata
is false
+
+ if (!populateAllFields) {
+ return metadataMap;
+ }
+
+ for (PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue
: metadataValuesToExtract) {
+ populateMetadataField(metadataValue, message, metadataMap);
+ }
+
+ return metadataMap;
+ }
+
+ private static void
populateMetadataField(PulsarStreamMessageMetadata.PulsarMessageMetadataValue
value,
+ Message<?> message, Map<String, String> metadataMap) {
+ switch (value) {
+ case PUBLISH_TIME:
+ long publishTime = message.getPublishTime();
+ if (publishTime > 0) {
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
+ publishTime);
+ }
+ break;
+ case EVENT_TIME:
+ long eventTime = message.getEventTime();
+ if (eventTime > 0) {
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
+ eventTime);
+ }
+ break;
+ case BROKER_PUBLISH_TIME:
+ message.getBrokerPublishTime()
+ .ifPresent(brokerPublishTime -> setMetadataMapField(metadataMap,
+
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME,
brokerPublishTime));
+ break;
+ case MESSAGE_KEY:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
+ message.getKey());
+ break;
+ case MESSAGE_ID:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID,
+ message.getMessageId().toString());
+ break;
+ case MESSAGE_ID_BYTES_B64:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64,
+ message.getMessageId().toByteArray());
+ break;
+ case PRODUCER_NAME:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PRODUCER_NAME,
+ message.getProducerName());
+ break;
+ case SCHEMA_VERSION:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SCHEMA_VERSION,
+ message.getSchemaVersion());
+ break;
+ case SEQUENCE_ID:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SEQUENCE_ID,
+ message.getSequenceId());
+ break;
+ case ORDERING_KEY:
+ if (message.hasOrderingKey()) {
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.ORDERING_KEY,
+ message.getOrderingKey());
+ }
+ break;
+ case SIZE:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SIZE,
+ message.size());
+ break;
+ case TOPIC_NAME:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME,
+ message.getTopicName());
+ break;
+ case INDEX:
+ message.getIndex().ifPresent(index -> setMetadataMapField(metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.INDEX,
index));
+ break;
+ case REDELIVERY_COUNT:
+ setMetadataMapField(metadataMap,
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.REDELIVERY_COUNT,
+ message.getRedeliveryCount());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported metadata value: " +
value);
+ }
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ String value) {
+ if (StringUtils.isNotBlank(value)) {
+ metadataMap.put(metadataValue.getKey(), value);
+ }
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ int value) {
+ setMetadataMapField(metadataMap, metadataValue, String.valueOf(value));
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ long value) {
+ setMetadataMapField(metadataMap, metadataValue, String.valueOf(value));
+ }
+
+ private static void setMetadataMapField(Map<String, String> metadataMap,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue metadataValue,
+ byte[] value) {
+ if (value != null && value.length > 0) {
+ setMetadataMapField(metadataMap, metadataValue,
Base64.getEncoder().encodeToString(value));
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
index 3ad57b55bb..11033ec716 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConnectionHandler.java
@@ -40,6 +40,7 @@ public class PulsarPartitionLevelConnectionHandler {
protected final PulsarConfig _config;
protected final String _clientId;
protected PulsarClient _pulsarClient = null;
+ protected final PulsarMetadataExtractor _pulsarMetadataExtractor;
/**
* Creates a new instance of {@link PulsarClient} and {@link Reader}
@@ -47,7 +48,7 @@ public class PulsarPartitionLevelConnectionHandler {
public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig
streamConfig) {
_config = new PulsarConfig(streamConfig, clientId);
_clientId = clientId;
-
+ _pulsarMetadataExtractor =
PulsarMetadataExtractor.build(_config.isPopulateMetadata(),
_config.getMetadataFields());
try {
ClientBuilder pulsarClientBuilder =
PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
if (_config.getTlsTrustCertsFilePath() != null) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
index 0e55a07aa3..d1b80b0360 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -48,15 +47,15 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
private final ExecutorService _executorService;
private final Reader _reader;
- private boolean _enableKeyValueStitch = false;
+ private boolean _enableKeyValueStitch;
public PulsarPartitionLevelConsumer(String clientId, StreamConfig
streamConfig,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
super(clientId, streamConfig);
PulsarConfig config = new PulsarConfig(streamConfig, clientId);
- _reader = createReaderForPartition(config.getPulsarTopicName(),
- partitionGroupConsumptionStatus.getPartitionGroupId(),
- config.getInitialMessageId());
+ _reader =
+ createReaderForPartition(config.getPulsarTopicName(),
partitionGroupConsumptionStatus.getPartitionGroupId(),
+ config.getInitialMessageId());
LOGGER.info("Created pulsar reader with id {} for topic {} partition {}",
_reader, _config.getPulsarTopicName(),
partitionGroupConsumptionStatus.getPartitionGroupId());
_executorService = Executors.newSingleThreadExecutor();
@@ -64,19 +63,19 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
}
/**
- * Fetch records from the Pulsar stream between the start and end
KinesisCheckpoint
+ * Fetch records from the Pulsar stream between the start and end
StreamPartitionMsgOffset
* Used {@link org.apache.pulsar.client.api.Reader} to read the messaged
from pulsar partitioned topic
* The reader seeks to the startMsgOffset and starts reading records in a
loop until endMsgOffset or timeout is
* reached.
*/
@Override
- public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
StreamPartitionMsgOffset endMsgOffset,
- int timeoutMillis) {
+ public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset,
+ StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
final MessageId startMessageId = ((MessageIdStreamOffset)
startMsgOffset).getMessageId();
final MessageId endMessageId =
endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset)
endMsgOffset).getMessageId();
- List<Message<byte[]>> messagesList = new ArrayList<>();
+ List<PulsarStreamMessage> messagesList = new ArrayList<>();
Future<PulsarMessageBatch> pulsarResultFuture =
_executorService.submit(() -> fetchMessages(startMessageId,
endMessageId, messagesList));
@@ -96,7 +95,7 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
}
public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId
endMessageId,
- List<Message<byte[]>> messagesList) {
+ List<PulsarStreamMessage> messagesList) {
try {
_reader.seek(startMessageId);
@@ -108,7 +107,8 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
break;
}
}
- messagesList.add(nextMessage);
+ messagesList.add(
+ PulsarUtils.buildPulsarStreamMessage(nextMessage,
_enableKeyValueStitch, _pulsarMetadataExtractor));
if (Thread.interrupted()) {
break;
@@ -124,11 +124,11 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
}
}
- private Iterable<Message<byte[]>> buildOffsetFilteringIterable(final
List<Message<byte[]>> messageAndOffsets,
+ private Iterable<PulsarStreamMessage> buildOffsetFilteringIterable(final
List<PulsarStreamMessage> messageAndOffsets,
final MessageId startOffset, final MessageId endOffset) {
return Iterables.filter(messageAndOffsets, input -> {
// Filter messages that are either null or have an offset ∉
[startOffset, endOffset]
- return input != null && input.getData() != null &&
(input.getMessageId().compareTo(startOffset) >= 0) && (
+ return input != null && input.getValue() != null &&
(input.getMessageId().compareTo(startOffset) >= 0) && (
(endOffset == null) || (input.getMessageId().compareTo(endOffset) <
0));
});
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
index 60272c6212..82040f6de3 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java
@@ -81,9 +81,9 @@ public class PulsarStreamLevelConsumer implements
StreamLevelConsumer {
// Log every minute or 100k events
if (now - _lastLogTime > 60000 || _currentCount - _lastCount >=
100000) {
if (_lastCount == 0) {
- _logger.info("Consumed {} events from kafka stream {}",
_currentCount, _streamConfig.getTopicName());
+ _logger.info("Consumed {} events from pulsar stream {}",
_currentCount, _streamConfig.getTopicName());
} else {
- _logger.info("Consumed {} events from kafka stream {}
(rate:{}/s)", _currentCount - _lastCount,
+ _logger.info("Consumed {} events from pulsar stream {}
(rate:{}/s)", _currentCount - _lastCount,
_streamConfig.getTopicName(), (float) (_currentCount -
_lastCount) * 1000 / (now - _lastLogTime));
}
_lastCount = _currentCount;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
new file mode 100644
index 0000000000..7e09197857
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pulsar.client.api.MessageId;
+
+public class PulsarStreamMessage extends StreamMessage<byte[]> {
+
+ private final MessageId _messageId;
+ public PulsarStreamMessage(@Nullable byte[] key, byte[] value, MessageId
messageId,
+ @Nullable PulsarStreamMessageMetadata
metadata, int length) {
+ super(key, value, metadata, length);
+ _messageId = messageId;
+ }
+
+ public MessageId getMessageId() {
+ return _messageId;
+ }
+
+ int getKeyLength() {
+ byte[] key = getKey();
+ return key == null ? 0 : key.length;
+ }
+
+ int getValueLength() {
+ byte[] value = getValue();
+ return value == null ? 0 : value.length;
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
new file mode 100644
index 0000000000..59220138d7
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.EnumSet;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+
+/**
+ * Pulsar specific implementation of {@link StreamMessageMetadata}
+ * Pulsar makes many metadata values available for each message. Please see
the pulsar documentation for more details.
+ * @see <a
href="https://pulsar.apache.org/docs/en/concepts-messaging/#message-properties">Pulsar
Message Properties</a>
+ */
+public class PulsarStreamMessageMetadata extends StreamMessageMetadata {
+
+ public enum PulsarMessageMetadataValue {
+ PUBLISH_TIME("publishTime"),
+ EVENT_TIME("eventTime"),
+ BROKER_PUBLISH_TIME("brokerPublishTime"),
+ MESSAGE_KEY("key"),
+ MESSAGE_ID("messageId"),
+ MESSAGE_ID_BYTES_B64("messageIdBytes"),
+ PRODUCER_NAME("producerName"),
+ SCHEMA_VERSION("schemaVersion"),
+ SEQUENCE_ID("sequenceId"),
+ ORDERING_KEY("orderingKey"),
+ SIZE("size"),
+ TOPIC_NAME("topicName"),
+ INDEX("index"),
+ REDELIVERY_COUNT("redeliveryCount");
+
+ private final String _key;
+
+ PulsarMessageMetadataValue(String key) {
+ _key = key;
+ }
+
+ public String getKey() {
+ return _key;
+ }
+
+ public static PulsarMessageMetadataValue findByKey(final String key) {
+ EnumSet<PulsarMessageMetadataValue> values =
EnumSet.allOf(PulsarMessageMetadataValue.class);
+ return values.stream().filter(value ->
value.getKey().equals(key)).findFirst().orElse(null);
+ }
+ }
+
+ public PulsarStreamMessageMetadata(long recordIngestionTimeMs,
+ @Nullable GenericRow headers) {
+ super(recordIngestionTimeMs, headers);
+ }
+
+ public PulsarStreamMessageMetadata(long recordIngestionTimeMs, @Nullable
GenericRow headers,
+ Map<String, String> metadata) {
+ super(recordIngestionTimeMs, headers, metadata);
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
index 763b0fc0d4..d22f8b0b5f 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java
@@ -18,13 +18,22 @@
*/
package org.apache.pinot.plugin.stream.pulsar;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PulsarUtils {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarUtils.class);
+
+ private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);
+
private PulsarUtils() {
}
@@ -51,4 +60,35 @@ public class PulsarUtils {
throw new IllegalArgumentException("Unknown initial offset value " +
offsetCriteria);
}
+
+ /**
+ * Stitch key and value bytes together using a simple format:
+ * 4 bytes for key length + key bytes + 4 bytes for value length + value
bytes
+ */
+ protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+ int keyLen = keyBytes.length;
+ int valueLen = valueBytes.length;
+ int totalByteArrayLength = 8 + keyLen + valueLen;
+ try (ByteArrayOutputStream bos = new
ByteArrayOutputStream(totalByteArrayLength)) {
+ LENGTH_BUF.clear();
+ bos.write(LENGTH_BUF.putInt(keyLen).array());
+ bos.write(keyBytes);
+ LENGTH_BUF.clear();
+ bos.write(LENGTH_BUF.putInt(valueLen).array());
+ bos.write(valueBytes);
+ return bos.toByteArray();
+ } catch (Exception e) {
+ LOGGER.error("Unable to stitch key and value bytes together", e);
+ }
+ return null;
+ }
+
+ protected static PulsarStreamMessage
buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
+ PulsarMetadataExtractor pulsarMetadataExtractor) {
+ byte[] key = message.getKeyBytes();
+ byte[] data = enableKeyValueStitch ? stitchKeyValue(key,
message.getData()) : message.getData();
+ int dataLength = (data != null) ? data.length : 0;
+ return new PulsarStreamMessage(key, data, message.getMessageId(),
+ (PulsarStreamMessageMetadata)
pulsarMetadataExtractor.extract(message), dataLength);
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
new file mode 100644
index 0000000000..ad23c83ce0
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfigTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PulsarConfigTest {
+ public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
+
+ public static final String STREAM_TYPE = "pulsar";
+ public static final String STREAM_PULSAR_BROKER_LIST =
"pulsar://localhost:6650";
+ public static final String STREAM_PULSAR_CONSUMER_TYPE = "simple";
+ Map<String, String> getCommonStreamConfigMap() {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", STREAM_TYPE);
+ streamConfigMap.put("stream.pulsar.consumer.type",
STREAM_PULSAR_CONSUMER_TYPE);
+ streamConfigMap.put("stream.pulsar.topic.name", "test-topic");
+ streamConfigMap.put("stream.pulsar.bootstrap.servers",
STREAM_PULSAR_BROKER_LIST);
+ streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset",
"smallest");
+ streamConfigMap.put("stream.pulsar.consumer.factory.class.name",
PulsarConsumerFactory.class.getName());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
+ "1000");
+ streamConfigMap.put("stream.pulsar.decoder.class.name", "decoderClass");
+ return streamConfigMap;
+ }
+
+ @Test
+ public void testParsingMetadataConfigWithConfiguredFields() throws Exception
{
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.METADATA_POPULATE),
+ "true");
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
PulsarConfig.METADATA_FIELDS),
+ "messageId,messageIdBytes, publishTime, eventTime, key, topicName, ");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE,
streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertEquals(metadataFieldsToExtract.size(), 6);
+ Assert.assertTrue(metadataFieldsToExtract.containsAll(List.of(
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID,
+
PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY,
+ PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME)));
+ }
+
+ @Test
+ public void testParsingMetadataConfigWithoutConfiguredFields() throws
Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.METADATA_POPULATE),
+ "true");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE,
streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+ }
+
+ @Test
+ public void testParsingNoMetadataConfig() throws Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.METADATA_POPULATE),
+ "false");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE,
streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Assert.assertFalse(pulsarConfig.isPopulateMetadata());
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+ }
+
+ @Test
+ public void testParsingNoMetadataConfigWithConfiguredFields() throws
Exception {
+ Map<String, String> streamConfigMap = getCommonStreamConfigMap();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.METADATA_POPULATE),
+ "false");
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
PulsarConfig.METADATA_FIELDS),
+ "messageId,messageIdBytes, publishTime, eventTime, key, topicName, ");
+ StreamConfig streamConfig = new StreamConfig(TABLE_NAME_WITH_TYPE,
streamConfigMap);
+ PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, "testId");
+ Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue>
metadataFieldsToExtract =
+ pulsarConfig.getMetadataFields();
+ Assert.assertFalse(pulsarConfig.isPopulateMetadata());
+ Assert.assertEquals(metadataFieldsToExtract.size(), 0);
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 4779e6afcd..9d59f82fcc 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -281,34 +281,35 @@ public class PulsarConsumerTest {
int totalMessagesReceived = 0;
- final PartitionGroupConsumer consumer =
- streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID,
partitionGroupConsumptionStatus);
- final MessageBatch messageBatch1 = consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest),
+ final PulsarPartitionLevelConsumer consumer =
+ (PulsarPartitionLevelConsumer) streamConsumerFactory
+ .createPartitionGroupConsumer(CLIENT_ID,
partitionGroupConsumptionStatus);
+ final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest),
new
MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch1.getMessageCount(), 500);
for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
+ final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + i);
totalMessagesReceived++;
}
- final MessageBatch messageBatch2 =
+ final PulsarMessageBatch messageBatch2 =
consumer.fetchMessages(new
MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 500)), null,
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch2.getMessageCount(), 500);
for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
+ final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
totalMessagesReceived++;
}
- final MessageBatch messageBatch3 =
+ final PulsarMessageBatch messageBatch3 =
consumer.fetchMessages(new
MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 10)),
new
MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 35)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch3.getMessageCount(), 25);
for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
+ final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
}
@@ -333,36 +334,37 @@ public class PulsarConsumerTest {
int totalMessagesReceived = 0;
- final PartitionGroupConsumer consumer =
- streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID,
partitionGroupConsumptionStatus);
+ final PulsarPartitionLevelConsumer consumer =
+ (PulsarPartitionLevelConsumer)
streamConsumerFactory.createPartitionGroupConsumer(CLIENT_ID,
+ partitionGroupConsumptionStatus);
//TODO: This test failed, check it out.
- final MessageBatch messageBatch1 = consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest),
+ final PulsarMessageBatch messageBatch1 = consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest),
new
MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch1.getMessageCount(), 500);
for (int i = 0; i < messageBatch1.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch1.getMessageAtIndex(i);
+ final byte[] msg = messageBatch1.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + i);
totalMessagesReceived++;
}
- final MessageBatch messageBatch2 =
+ final PulsarMessageBatch messageBatch2 =
consumer.fetchMessages(new
MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 500)),
null,
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch2.getMessageCount(), 500);
for (int i = 0; i < messageBatch2.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch2.getMessageAtIndex(i);
+ final byte[] msg = messageBatch2.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
totalMessagesReceived++;
}
- final MessageBatch messageBatch3 =
+ final PulsarMessageBatch messageBatch3 =
consumer.fetchMessages(new
MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 10)),
new
MessageIdStreamOffset(getBatchMessageIdForPartitionAndIndex(partition, 35)),
CONSUMER_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(messageBatch3.getMessageCount(), 25);
for (int i = 0; i < messageBatch3.getMessageCount(); i++) {
- final byte[] msg = (byte[]) messageBatch3.getMessageAtIndex(i);
+ final byte[] msg = messageBatch3.getMessageAtIndex(i).getValue();
Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
index 7cc0a99a6c..904dd33a04 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
@@ -20,10 +20,13 @@ package org.apache.pinot.plugin.stream.pulsar;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
@@ -37,30 +40,33 @@ public class PulsarMessageBatchTest {
private DummyPulsarMessage _msgWithKeyAndValue;
private byte[] _expectedValueBytes;
private byte[] _expectedKeyBytes;
- private List<Message<byte[]>> _messageList;
+ private List<DummyPulsarMessage> _messageList;
+ private PulsarMetadataExtractor _metadataExtractor;
- class DummyPulsarMessage implements Message<byte[]> {
+ public static class DummyPulsarMessage implements Message<byte[]> {
private final byte[] _keyData;
private final byte[] _valueData;
+ private Map<String, String> _properties;
public DummyPulsarMessage(byte[] key, byte[] value) {
_keyData = key;
_valueData = value;
+ _properties = new HashMap<>();
}
@Override
public Map<String, String> getProperties() {
- return null;
+ return _properties;
}
@Override
public boolean hasProperty(String name) {
- return false;
+ return _properties.containsKey(name);
}
@Override
public String getProperty(String name) {
- return null;
+ return _properties.get(name);
}
@Override
@@ -80,7 +86,7 @@ public class PulsarMessageBatchTest {
@Override
public MessageId getMessageId() {
- return null;
+ return MessageId.earliest;
}
@Override
@@ -110,7 +116,7 @@ public class PulsarMessageBatchTest {
@Override
public String getKey() {
- return _keyData.toString();
+ return new String(_keyData);
}
@Override
@@ -196,20 +202,28 @@ public class PulsarMessageBatchTest {
_random.nextBytes(_expectedKeyBytes);
_msgWithKeyAndValue = new DummyPulsarMessage(_expectedKeyBytes,
_expectedValueBytes);
_messageList = new ArrayList<>();
+ _metadataExtractor = PulsarMetadataExtractor.build(true,
+
EnumSet.allOf(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.class));
_messageList.add(_msgWithKeyAndValue);
}
@Test
public void testMessageBatchNoStitching() {
- PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList,
false);
- byte[] valueBytes = messageBatch.getMessageAtIndex(0);
+ List<PulsarStreamMessage> streamMessages =
_messageList.stream().map(message ->
+ PulsarUtils.buildPulsarStreamMessage(message, false,
_metadataExtractor))
+ .collect(Collectors.toList());
+ PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages,
false);
+ byte[] valueBytes = messageBatch.getMessageAtIndex(0).getValue();
Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
}
@Test
public void testMessageBatchWithStitching() {
- PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList,
true);
- byte[] keyValueBytes = messageBatch.getMessageAtIndex(0);
+ List<PulsarStreamMessage> streamMessages =
_messageList.stream().map(message ->
+ PulsarUtils.buildPulsarStreamMessage(message, true,
_metadataExtractor))
+ .collect(Collectors.toList());
+ PulsarMessageBatch messageBatch = new PulsarMessageBatch(streamMessages,
true);
+ byte[] keyValueBytes = messageBatch.getMessageAtIndex(0).getValue();
Assert.assertEquals(keyValueBytes.length, 8 + _expectedKeyBytes.length +
_expectedValueBytes.length);
try {
ByteBuffer byteBuffer = ByteBuffer.wrap(keyValueBytes);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
new file mode 100644
index 0000000000..4c8e28021a
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractorTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+import org.apache.pulsar.client.api.MessageId;
+import org.bouncycastle.util.encoders.Base64;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.plugin.stream.pulsar.PulsarMessageBatchTest.DummyPulsarMessage;
+import static
org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID;
+import static
org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64;
+import static
org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY;
+import static org.testng.Assert.assertEquals;
+
+
+public class PulsarMetadataExtractorTest {
+
+ private PulsarMetadataExtractor _metadataExtractor;
+
+ @BeforeClass
+ public void setup() {
+ _metadataExtractor = PulsarMetadataExtractor.build(true,
Set.of(MESSAGE_ID, MESSAGE_ID_BYTES_B64, MESSAGE_KEY));
+ }
+
+ @Test
+ public void testExtractProperty()
+ throws Exception {
+ DummyPulsarMessage pulsarMessage =
+ new DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8));
+ pulsarMessage.getProperties().put("test_key", "test_value");
+ pulsarMessage.getProperties().put("test_key2", "2");
+ PulsarStreamMessageMetadata metadata = (PulsarStreamMessageMetadata)
_metadataExtractor.extract(pulsarMessage);
+ assertEquals("test_value", metadata.getHeaders().getValue("test_key"));
+ assertEquals("2", metadata.getHeaders().getValue("test_key2"));
+ assertEquals("key",
metadata.getRecordMetadata().get(MESSAGE_KEY.getKey()));
+ String messageIdStr =
metadata.getRecordMetadata().get(MESSAGE_ID.getKey());
+ assertEquals(pulsarMessage.getMessageId().toString(), messageIdStr);
+
+ byte[] messageIdBytes =
Base64.decode(metadata.getRecordMetadata().get(MESSAGE_ID_BYTES_B64.getKey()));
+ MessageId messageId = MessageId.fromByteArray(messageIdBytes);
+ assertEquals(MessageId.earliest, messageId);
+ }
+
+ @Test
+ public void testPulsarSteamMessageUnstitched() {
+ String key = "key";
+ String value = "value";
+ DummyPulsarMessage dummyPulsarMessage =
+ new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8),
value.getBytes(StandardCharsets.UTF_8));
+ PulsarStreamMessage streamMessage =
+ PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, false,
_metadataExtractor);
+ assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey());
+ assertEquals(value.getBytes(StandardCharsets.UTF_8),
streamMessage.getValue());
+ assertEquals(key.getBytes(StandardCharsets.UTF_8).length,
streamMessage.getKeyLength());
+ assertEquals(value.getBytes(StandardCharsets.UTF_8).length,
streamMessage.getValueLength());
+ }
+
+ @Test
+ public void testPulsarSteamMessageStitched() {
+ String key = "key";
+ String value = "value";
+ byte[] stitchedValueBytes =
+ PulsarUtils.stitchKeyValue(key.getBytes(StandardCharsets.UTF_8),
value.getBytes(StandardCharsets.UTF_8));
+ DummyPulsarMessage dummyPulsarMessage =
+ new DummyPulsarMessage(key.getBytes(StandardCharsets.UTF_8),
value.getBytes(StandardCharsets.UTF_8));
+ PulsarStreamMessage streamMessage =
+ PulsarUtils.buildPulsarStreamMessage(dummyPulsarMessage, true,
_metadataExtractor);
+ assertEquals(key.getBytes(StandardCharsets.UTF_8), streamMessage.getKey());
+ assertEquals(stitchedValueBytes, streamMessage.getValue());
+ assertEquals(key.getBytes(StandardCharsets.UTF_8).length,
streamMessage.getKeyLength());
+ assertEquals(stitchedValueBytes.length, streamMessage.getValueLength());
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 97958b92d3..b570067a69 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -50,10 +50,11 @@ public class StreamDataDecoderImpl implements
StreamDataDecoder {
row.putValue(KEY, new String(message.getKey(),
StandardCharsets.UTF_8));
}
RowMetadata metadata = message.getMetadata();
- if (metadata != null && metadata.getHeaders() != null) {
- metadata.getHeaders().getFieldToValueMap()
- .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key,
value));
-
+ if (metadata != null) {
+ if (metadata.getHeaders() != null) {
+ metadata.getHeaders().getFieldToValueMap()
+ .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX +
key, value));
+ }
metadata.getRecordMetadata()
.forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX +
key, value));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]