This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 737d443f60 Extract record keys, headers and metadata from Stream sources (#9224) 737d443f60 is described below commit 737d443f601703ce17d1e3b8a27bd853c88cfa57 Author: Navina Ramesh <nav...@apache.org> AuthorDate: Wed Sep 28 11:04:56 2022 +0530 Extract record keys, headers and metadata from Stream sources (#9224) * initial commit from kishore adding StreamMessage concept and using it with kafka move StreamMessage into pinot-spi Use StreamDataDecoder interface for deocding a StreamMessage in LLRealtimeSegmentDataManager verified key and header with realtime quick start checkstyle updating realtime quickstart to include headers and key parsing fix failing tests checkstyle revert to continue using hlc in integration tests added TODO for failing test; revert adding header in the integ test nit Deleting unused class: MessageAndOffset and MessageAndOffsetAndMetadata * Adding metadata to RowMetadata to distinguish it from headers Addressing feeback * clear reuse genericrow * Making offset as a part of KafkaStreamMessageMetadata; changed StreamMessage into a concrete class * rename RowMetadataExtractor.java to KafkaMetadataExtractor.java * adding unit test addressing PR feedback * Always populate offset and ingestion time in record time * update the comment * rename to recordIngestionTimeMs; Fixed unit test assertion in KafkaPartition consumer - Kafka will always have metadata * include kafka recordTimestamp in streamMessageMetadata object * do not use Map.of --- .../common/function/scalar/StringFunctions.java | 32 ++++- .../realtime/LLRealtimeSegmentDataManager.java | 35 +++--- .../tests/HybridClusterIntegrationTest.java | 1 + .../plugin/stream/kafka20/KafkaMessageBatch.java | 31 +++-- .../stream/kafka20/KafkaMetadataExtractor.java | 57 +++++++++ .../KafkaPartitionLevelConnectionHandler.java | 4 +- .../kafka20/KafkaPartitionLevelConsumer.java | 16 ++- .../plugin/stream/kafka20/KafkaStreamMessage.java} | 39 ++---- ...ractor.java => KafkaStreamMessageMetadata.java} | 18 +-- .../stream/kafka20/server/KafkaDataProducer.java | 38 ++++++ .../kafka20/KafkaPartitionLevelConsumerTest.java | 24 ++-- .../indexsegment/mutable/MutableSegmentImpl.java | 2 +- .../indexsegment/mutable/IndexingFailureTest.java | 2 +- .../MutableSegmentImplAggregateMetricsTest.java | 2 +- ...MutableSegmentImplIngestionAggregationTest.java | 2 +- .../mutable/MutableSegmentImplRawMVTest.java | 2 +- .../mutable/MutableSegmentImplTest.java | 2 +- .../org/apache/pinot/spi/stream/MessageBatch.java | 15 +++ .../org/apache/pinot/spi/stream/RowMetadata.java | 36 +++++- ...MessageMetadata.java => StreamDataDecoder.java} | 26 ++-- .../pinot/spi/stream/StreamDataDecoderImpl.java | 70 +++++++++++ .../pinot/spi/stream/StreamDataDecoderResult.java | 33 +++--- .../pinot/spi/stream/StreamDataProducer.java | 31 +++-- .../org/apache/pinot/spi/stream/StreamMessage.java | 69 +++++++++++ .../pinot/spi/stream/StreamMessageMetadata.java | 39 ++++-- .../spi/stream/StreamDataDecoderImplTest.java | 132 +++++++++++++++++++++ .../pinot/spi/stream/StreamMessageTest.java} | 36 +++--- .../org/apache/pinot/tools/QuickStartBase.java | 2 +- .../pinot/tools/streams/PinotRealtimeSource.java | 2 +- .../meetupRsvp_realtime_table_config.json | 3 +- .../stream/meetupRsvp/meetupRsvp_schema.json | 8 ++ 31 files changed, 651 insertions(+), 158 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java index 4277cec152..59862ccd86 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java @@ -384,6 +384,36 @@ public class StringFunctions { return new String(result); } + /** + * @param bytes + * @param charsetName encoding + * @return bytearray to string + * returns null on exception + */ + @ScalarFunction + public static String fromBytes(byte[] bytes, String charsetName) { + try { + return new String(bytes, charsetName); + } catch (UnsupportedEncodingException e) { + return null; + } + } + + /** + * @param input + * @param charsetName encoding + * @return bytearray to string + * returns null on exception + */ + @ScalarFunction + public static byte[] toBytes(String input, String charsetName) { + try { + return input.getBytes(charsetName); + } catch (UnsupportedEncodingException e) { + return null; + } + } + /** * @see StandardCharsets#UTF_8#encode(String) * @param input @@ -556,7 +586,7 @@ public class StringFunctions { @ScalarFunction public static String encodeUrl(String input) throws UnsupportedEncodingException { - return URLEncoder.encode(input, StandardCharsets.UTF_8.toString()); + return URLEncoder.encode(input, StandardCharsets.UTF_8.toString()); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 235bf75ef2..f20a9be217 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -82,6 +82,9 @@ import org.apache.pinot.spi.stream.PermanentConsumerException; import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamDataDecoder; +import org.apache.pinot.spi.stream.StreamDataDecoderImpl; +import org.apache.pinot.spi.stream.StreamDataDecoderResult; import org.apache.pinot.spi.stream.StreamDecoderProvider; import org.apache.pinot.spi.stream.StreamMessageDecoder; import org.apache.pinot.spi.stream.StreamMetadataProvider; @@ -212,7 +215,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final SegmentZKMetadata _segmentZKMetadata; private final TableConfig _tableConfig; private final RealtimeTableDataManager _realtimeTableDataManager; - private final StreamMessageDecoder _messageDecoder; + private final StreamDataDecoder _streamDataDecoder; private final int _segmentMaxRowCount; private final String _resourceDataDir; private final IndexLoadingConfig _indexLoadingConfig; @@ -506,7 +509,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { int streamMessageCount = 0; boolean canTakeMore = true; - GenericRow reuse = new GenericRow(); TransformPipeline.Result reusedResult = new TransformPipeline.Result(); boolean prematureExit = false; for (int index = 0; index < messageCount; index++) { @@ -539,18 +541,18 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { throw new RuntimeException("Realtime segment full"); } - // Index each message - reuse.clear(); - // retrieve metadata from the message batch if available - // this can be overridden by the decoder if there is a better indicator in the message payload - RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index); - - GenericRow decodedRow = _messageDecoder - .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index), - messagesAndOffsets.getMessageLengthAtIndex(index), reuse); - if (decodedRow != null) { + // Decode message + StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index)); + RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata(); + if (decodedRow.getException() != null) { + // TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on + // decode error + realtimeRowsDroppedMeter = + _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, + realtimeRowsDroppedMeter); + } else { try { - _transformPipeline.processRow(decodedRow, reusedResult); + _transformPipeline.processRow(decodedRow.getResult(), reusedResult); } catch (Exception e) { _numRowsErrored++; // when exception happens we prefer abandoning the whole batch and not partially indexing some rows @@ -585,10 +587,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { new SegmentErrorInfo(now(), errorMessage, e)); } } - } else { - realtimeRowsDroppedMeter = - _serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, - realtimeRowsDroppedMeter); } _currentOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index); @@ -1364,7 +1362,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Create message decoder Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); - _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); + StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); + _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder); _clientId = streamTopic + "-" + _partitionGroupId; _transformPipeline = new TransformPipeline(tableConfig, schema); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index ea705d4929..87963c8e84 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -135,6 +135,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet segmentMetadataFromDirectEndpoint.get("segment.total.docs")); } + // TODO: This test fails when using `llc` consumer mode. Needs investigation @Test public void testSegmentListApi() throws Exception { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java index 479469db97..4fd01562cc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java @@ -18,16 +18,17 @@ */ package org.apache.pinot.plugin.stream.kafka20; +import java.nio.ByteBuffer; import java.util.List; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamMessage; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; -public class KafkaMessageBatch implements MessageBatch<byte[]> { - - private final List<MessageAndOffsetAndMetadata> _messageList; +public class KafkaMessageBatch implements MessageBatch<StreamMessage> { + private final List<StreamMessage> _messageList; private final int _unfilteredMessageCount; private final long _lastOffset; @@ -36,7 +37,7 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { * @param lastOffset the offset of the last message in the batch * @param batch the messages, which may be smaller than {@see unfilteredMessageCount} */ - public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<MessageAndOffsetAndMetadata> batch) { + public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage> batch) { _messageList = batch; _lastOffset = lastOffset; _unfilteredMessageCount = unfilteredMessageCount; @@ -53,18 +54,18 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { } @Override - public byte[] getMessageAtIndex(int index) { - return _messageList.get(index).getMessage().array(); + public StreamMessage getMessageAtIndex(int index) { + return _messageList.get(index); } @Override public int getMessageOffsetAtIndex(int index) { - return _messageList.get(index).getMessage().arrayOffset(); + return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset(); } @Override public int getMessageLengthAtIndex(int index) { - return _messageList.get(index).payloadSize(); + return _messageList.get(index).getValue().length; } @Override @@ -74,7 +75,7 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { @Override public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) { - return new LongMsgOffset(_messageList.get(index).getNextOffset()); + return new LongMsgOffset(((KafkaStreamMessage) _messageList.get(index)).getNextOffset()); } @Override @@ -84,6 +85,16 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> { @Override public RowMetadata getMetadataAtIndex(int index) { - return _messageList.get(index).getRowMetadata(); + return _messageList.get(index).getMetadata(); + } + + @Override + public byte[] getMessageBytesAtIndex(int index) { + return _messageList.get(index).getValue(); + } + + @Override + public StreamMessage getStreamMessage(int index) { + return _messageList.get(index); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMetadataExtractor.java new file mode 100644 index 0000000000..922e165301 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMetadataExtractor.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kafka20; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.RowMetadata; + + +@FunctionalInterface +public interface KafkaMetadataExtractor { + static KafkaMetadataExtractor build(boolean populateMetadata) { + return record -> { + if (!populateMetadata) { + long recordTimestamp = record.timestamp(); + Map<String, String> metadataMap = new HashMap<>(); + metadataMap.put(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, String.valueOf(record.offset())); + metadataMap.put(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, String.valueOf(recordTimestamp)); + return new KafkaStreamMessageMetadata(recordTimestamp, RowMetadata.EMPTY_ROW, metadataMap); + } + GenericRow headerGenericRow = new GenericRow(); + Headers headers = record.headers(); + if (headers != null) { + Header[] headersArray = headers.toArray(); + for (Header header : headersArray) { + headerGenericRow.putValue(header.key(), header.value()); + } + } + Map<String, String> metadata = new HashMap<>(); + metadata.put(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, String.valueOf(record.offset())); + metadata.put(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, String.valueOf(record.timestamp())); + return new KafkaStreamMessageMetadata(record.timestamp(), headerGenericRow, metadata); + }; + } + + RowMetadata extract(ConsumerRecord<?, ?> consumerRecord); +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java index 7c1998dfc8..ca6290d59a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java @@ -45,7 +45,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { protected final String _topic; protected final Consumer<String, Bytes> _consumer; protected final TopicPartition _topicPartition; - protected final RowMetadataExtractor _rowMetadataExtractor; + protected final KafkaMetadataExtractor _kafkaMetadataExtractor; public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) { _config = new KafkaPartitionLevelStreamConfig(streamConfig); @@ -64,7 +64,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { _consumer = new KafkaConsumer<>(consumerProp); _topicPartition = new TopicPartition(_topic, _partition); _consumer.assign(Collections.singletonList(_topicPartition)); - _rowMetadataExtractor = RowMetadataExtractor.build(_config.isPopulateMetadata()); + _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata()); } public void close() diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java index bf212cd855..b6b116164f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.stream.kafka20; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -28,6 +29,8 @@ import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionLevelConsumer; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageMetadata; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,30 +46,35 @@ public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHa } @Override - public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset, + public MessageBatch<StreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) { final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset(); final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset(); return fetchMessages(startOffset, endOffset, timeoutMillis); } - public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { + public MessageBatch<StreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, endOffset, timeoutMillis); } + LOGGER.warn("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset, + endOffset, timeoutMillis); _consumer.seek(_topicPartition, startOffset); ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis)); List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition); - List<MessageAndOffsetAndMetadata> filtered = new ArrayList<>(messageAndOffsets.size()); + List<StreamMessage> filtered = new ArrayList<>(messageAndOffsets.size()); long lastOffset = startOffset; for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) { + String key = messageAndOffset.key(); + byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); Bytes message = messageAndOffset.value(); long offset = messageAndOffset.offset(); if (offset >= startOffset & (endOffset > offset | endOffset == -1)) { if (message != null) { + StreamMessageMetadata rowMetadata = (StreamMessageMetadata) _kafkaMetadataExtractor.extract(messageAndOffset); filtered.add( - new MessageAndOffsetAndMetadata(message.get(), offset, _rowMetadataExtractor.extract(messageAndOffset))); + new KafkaStreamMessage(keyBytes, message.get(), rowMetadata)); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug("tombstone message at offset {}", offset); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/MessageAndOffset.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java similarity index 58% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/MessageAndOffset.java rename to pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java index 124ca86fcb..8124bea53d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/MessageAndOffset.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java @@ -16,38 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.stream.kafka; +package org.apache.pinot.plugin.stream.kafka20; -import java.nio.ByteBuffer; +import javax.annotation.Nullable; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pinot.spi.stream.StreamMessageMetadata; -public class MessageAndOffset { - - private ByteBuffer _message; - private long _offset; - - public MessageAndOffset(byte[] message, long offset) { - this(ByteBuffer.wrap(message), offset); - } - - public MessageAndOffset(ByteBuffer message, long offset) { - _message = message; - _offset = offset; - } - - public ByteBuffer getMessage() { - return _message; - } - - public long getOffset() { - return _offset; +public class KafkaStreamMessage extends StreamMessage { + public KafkaStreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) { + super(key, value, metadata); } public long getNextOffset() { - return getOffset() + 1; - } - - public int payloadSize() { - return getMessage().array().length; + if (_metadata != null) { + long offset = Long.parseLong(_metadata.getRecordMetadata().get(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY)); + return offset < 0 ? -1 : offset + 1; + } + return -1; } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessageMetadata.java similarity index 65% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java rename to pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessageMetadata.java index 81402bb7d5..ada4e22bbb 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessageMetadata.java @@ -18,16 +18,18 @@ */ package org.apache.pinot.plugin.stream.kafka20; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.pinot.spi.stream.RowMetadata; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.StreamMessageMetadata; -@FunctionalInterface -public interface RowMetadataExtractor { - static RowMetadataExtractor build(boolean populateMetadata) { - return populateMetadata ? record -> new StreamMessageMetadata(record.timestamp()) : record -> null; - } +public class KafkaStreamMessageMetadata extends StreamMessageMetadata { + public static final String METADATA_OFFSET_KEY = "offset"; + public static final String RECORD_TIMESTAMP_KEY = "recordTimestamp"; - RowMetadata extract(ConsumerRecord<?, ?> consumerRecord); + public KafkaStreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers, + Map<String, String> metadata) { + super(recordIngestionTimeMs, headers, metadata); + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java index dd9790ec92..0bfc7a6cac 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java @@ -18,10 +18,17 @@ */ package org.apache.pinot.plugin.stream.kafka20.server; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.stream.StreamDataProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +75,37 @@ public class KafkaDataProducer implements StreamDataProducer { _producer.flush(); } + @Override + public void produce(String topic, byte[] key, byte[] payload, GenericRow headers) { + List<Header> headerList = new ArrayList<>(); + headerList.add(new RecordHeader("producerTimestamp", String.valueOf(System.currentTimeMillis()).getBytes( + StandardCharsets.UTF_8))); + if (headers != null) { + headers.getFieldToValueMap().forEach((k, v) -> { + Header header = convertToKafkaHeader(k, v); + if (header != null) { + headerList.add(header); + } + }); + } + _producer.send(new ProducerRecord<>(topic, null, key, payload, headerList)); + _producer.flush(); + } + + public Header convertToKafkaHeader(String key, Object value) { + if (value != null) { + if (value instanceof String) { + return new RecordHeader(key, ((String) value).getBytes(StandardCharsets.UTF_8)); + } + if (value instanceof Long) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(((Long) value)); + return new RecordHeader(key, buffer.array()); + } + } + return null; + } + @Override public void close() { _producer.close(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java index 5e12f79b9f..0a2f7d2f5f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java @@ -281,26 +281,26 @@ public class KafkaPartitionLevelConsumerTest { consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000); Assert.assertEquals(batch1.getMessageCount(), 500); for (int i = 0; i < batch1.getMessageCount(); i++) { - final byte[] msg = (byte[]) batch1.getMessageAtIndex(i); + final byte[] msg = batch1.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + i); - Assert.assertNull(batch1.getMetadataAtIndex(i)); + Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } // Test second half batch final MessageBatch batch2 = consumer.fetchMessages(new LongMsgOffset(500), new LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000); Assert.assertEquals(batch2.getMessageCount(), 500); for (int i = 0; i < batch2.getMessageCount(); i++) { - final byte[] msg = (byte[]) batch2.getMessageAtIndex(i); + final byte[] msg = batch2.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i)); - Assert.assertNull(batch1.getMetadataAtIndex(i)); + Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } // Some random range final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000); Assert.assertEquals(batch3.getMessageCount(), 25); for (int i = 0; i < batch3.getMessageCount(); i++) { - final byte[] msg = (byte[]) batch3.getMessageAtIndex(i); + final byte[] msg = batch3.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i)); - Assert.assertNull(batch1.getMetadataAtIndex(i)); + Assert.assertNotNull(batch1.getMetadataAtIndex(i)); } } } @@ -342,7 +342,7 @@ public class KafkaPartitionLevelConsumerTest { for (int i = 0; i < batch1.getMessageCount(); i++) { final RowMetadata metadata = batch1.getMetadataAtIndex(i); Assert.assertNotNull(metadata); - Assert.assertEquals(metadata.getIngestionTimeMs(), TIMESTAMP + i); + Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + i); } // Test second half batch final MessageBatch batch2 = @@ -351,7 +351,7 @@ public class KafkaPartitionLevelConsumerTest { for (int i = 0; i < batch2.getMessageCount(); i++) { final RowMetadata metadata = batch2.getMetadataAtIndex(i); Assert.assertNotNull(metadata); - Assert.assertEquals(metadata.getIngestionTimeMs(), TIMESTAMP + (500 + i)); + Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + (500 + i)); } // Some random range final MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(10), new LongMsgOffset(35), 10000); @@ -359,7 +359,7 @@ public class KafkaPartitionLevelConsumerTest { for (int i = 0; i < batch3.getMessageCount(); i++) { final RowMetadata metadata = batch3.getMetadataAtIndex(i); Assert.assertNotNull(metadata); - Assert.assertEquals(metadata.getIngestionTimeMs(), TIMESTAMP + (10 + i)); + Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + (10 + i)); } } } @@ -388,7 +388,7 @@ public class KafkaPartitionLevelConsumerTest { MessageBatch batch1 = consumer.fetchMessages(new LongMsgOffset(0), new LongMsgOffset(400), 10000); Assert.assertEquals(batch1.getMessageCount(), 200); for (int i = 0; i < batch1.getMessageCount(); i++) { - byte[] msg = (byte[]) batch1.getMessageAtIndex(i); + byte[] msg = batch1.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200)); } Assert.assertEquals(batch1.getOffsetOfNextBatch().toString(), "400"); @@ -400,7 +400,7 @@ public class KafkaPartitionLevelConsumerTest { MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(201), new LongMsgOffset(401), 10000); Assert.assertEquals(batch3.getMessageCount(), 200); for (int i = 0; i < batch3.getMessageCount(); i++) { - byte[] msg = (byte[]) batch3.getMessageAtIndex(i); + byte[] msg = batch3.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (i + 201)); } Assert.assertEquals(batch3.getOffsetOfNextBatch().toString(), "401"); @@ -408,7 +408,7 @@ public class KafkaPartitionLevelConsumerTest { MessageBatch batch4 = consumer.fetchMessages(new LongMsgOffset(0), null, 10000); Assert.assertEquals(batch4.getMessageCount(), 500); for (int i = 0; i < batch4.getMessageCount(); i++) { - byte[] msg = (byte[]) batch4.getMessageAtIndex(i); + byte[] msg = batch4.getStreamMessage(i).getValue(); Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200)); } Assert.assertEquals(batch4.getOffsetOfNextBatch().toString(), "700"); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index d6de8d0168..54d13b7bbe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -543,7 +543,7 @@ public class MutableSegmentImpl implements MutableSegment { // Update last indexed time and latest ingestion time _lastIndexedTimeMs = System.currentTimeMillis(); if (rowMetadata != null) { - _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getIngestionTimeMs()); + _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, rowMetadata.getRecordIngestionTimeMs()); } return canTakeMore; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java index 07bb1ce5a8..5207884ab0 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java @@ -64,7 +64,7 @@ public class IndexingFailureTest { @Test public void testIndexingFailures() throws IOException { - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis()); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); GenericRow goodRow = new GenericRow(); goodRow.putValue(INT_COL, 0); goodRow.putValue(STRING_COL, "a"); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java index 599c05f8b7..3a164387c2 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java @@ -99,7 +99,7 @@ public class MutableSegmentImplAggregateMetricsTest { Map<String, Long> expectedValues = new HashMap<>(); Map<String, Float> expectedValuesFloat = new HashMap<>(); - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis()); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); for (int i = 0; i < NUM_ROWS; i++) { int hoursSinceEpoch = random.nextInt(10); int daysSinceEpoch = random.nextInt(5); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java index 5c5e2beeec..e0aea45373 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java @@ -202,7 +202,7 @@ public class MutableSegmentImplIngestionAggregationTest { Random random = new Random(seed); - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis()); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis(), new GenericRow()); for (int i = 0; i < NUM_ROWS; i++) { GenericRow row = getRow(random); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java index 51e9e094d9..17599453b5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java @@ -103,7 +103,7 @@ public class MutableSegmentImplRawMVTest { .createMutableSegmentImpl(_schema, noDictionaryColumns, Collections.emptySet(), Collections.emptySet(), false); _lastIngestionTimeMs = System.currentTimeMillis(); - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs, new GenericRow()); _startTimeMs = System.currentTimeMillis(); try (RecordReader recordReader = RecordReaderFactory diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java index 38ae2c5f20..5a58c2dddc 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java @@ -87,7 +87,7 @@ public class MutableSegmentImplTest { .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), false); _lastIngestionTimeMs = System.currentTimeMillis(); - StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs); + StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs, new GenericRow()); _startTimeMs = System.currentTimeMillis(); try (RecordReader recordReader = RecordReaderFactory diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 4cebfb8a4a..0e80df0234 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -47,8 +47,23 @@ public interface MessageBatch<T> { * @param index * @return */ + @Deprecated T getMessageAtIndex(int index); + // for backward-compatibility + default byte[] getMessageBytesAtIndex(int index) { + return (byte[]) getMessageAtIndex(index); + } + + default StreamMessage getStreamMessage(int index) { + return new LegacyStreamMessage(getMessageBytesAtIndex(index)); + } + + class LegacyStreamMessage extends StreamMessage { + public LegacyStreamMessage(byte[] value) { + super(value); + } + } /** * Returns the offset of the message at a particular index inside a set of messages returned from the stream. * @param index diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java index a8b0f2116f..4c4f17792e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java @@ -18,8 +18,11 @@ */ package org.apache.pinot.spi.stream; +import java.util.Collections; +import java.util.Map; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; +import org.apache.pinot.spi.data.readers.GenericRow; /** @@ -31,13 +34,40 @@ import org.apache.pinot.spi.annotations.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Evolving public interface RowMetadata { + GenericRow EMPTY_ROW = new GenericRow(); + Map<String, String> EMPTY_COLLECTION = Collections.emptyMap(); /** - * Return the timestamp associated with when the row was ingested upstream. - * Expected to be mainly used for stream-based sources. + * Returns the timestamp associated with the record. This typically refers to the time it was ingested into the + * upstream source. In some cases, it may be the time at which the record was created, aka event time (eg. in kafka, + * a topic may be configured to use record `CreateTime` instead of `LogAppendTime`). + * + * Expected to be used for stream-based sources. * * @return timestamp (epoch in milliseconds) when the row was ingested upstream * Long.MIN_VALUE if not available */ - long getIngestionTimeMs(); + long getRecordIngestionTimeMs(); + + /** + * Returns the stream message headers + * + * @return A {@link GenericRow} that encapsulates the headers in the ingested row + */ + default GenericRow getHeaders() { + EMPTY_ROW.clear(); + return EMPTY_ROW; + } + + /** + * Returns the metadata associated with the stream record + * + * Kafka's record offset would be an example of a metadata associated with the record. Record metadata is typically + * stream specific and hence, it is defined as a Map of strings. + * + * @return A Map of record metadata entries. + */ + default Map<String, String> getRecordMetadata() { + return EMPTY_COLLECTION; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoder.java similarity index 58% copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java copy to pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoder.java index 9991f34eac..570197fe7c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoder.java @@ -19,25 +19,17 @@ package org.apache.pinot.spi.stream; /** - * A class that provides metadata associated with the message of a stream, for e.g., - * ingestion-timestamp of the message. + * A decoder for {@link StreamMessage} */ -public class StreamMessageMetadata implements RowMetadata { - - private final long _ingestionTimeMs; - +public interface StreamDataDecoder { /** - * Construct the stream based message/row message metadata + * Decodes a {@link StreamMessage} * - * @param ingestionTimeMs the time that the message was ingested by the stream provider - * use Long.MIN_VALUE if not applicable + * Please note that the expectation is that the implementations of this class should never throw an exception. + * Instead, it should encapsulate the exception within the {@link StreamDataDecoderResult} object. + * + * @param message {@link StreamMessage} that contains the data payload and optionally, a key and row metadata + * @return {@link StreamDataDecoderResult} that either contains the decoded row or the exception */ - public StreamMessageMetadata(long ingestionTimeMs) { - _ingestionTimeMs = ingestionTimeMs; - } - - @Override - public long getIngestionTimeMs() { - return _ingestionTimeMs; - } + StreamDataDecoderResult decode(StreamMessage message); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java new file mode 100644 index 0000000000..3e69dbca03 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import java.nio.charset.StandardCharsets; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StreamDataDecoderImpl implements StreamDataDecoder { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamDataDecoderImpl.class); + + public static final String KEY = "__key"; + public static final String HEADER_KEY_PREFIX = "__header$"; + public static final String METADATA_KEY_PREFIX = "__metadata$"; + + private final StreamMessageDecoder _valueDecoder; + private final GenericRow _reuse = new GenericRow(); + + public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) { + _valueDecoder = valueDecoder; + } + + @Override + public StreamDataDecoderResult decode(StreamMessage message) { + assert message.getValue() != null; + + try { + _reuse.clear(); + GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getValue().length, _reuse); + if (row != null) { + if (message.getKey() != null) { + row.putValue(KEY, new String(message.getKey(), StandardCharsets.UTF_8)); + } + RowMetadata metadata = message.getMetadata(); + if (metadata != null) { + metadata.getHeaders().getFieldToValueMap() + .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, value)); + + metadata.getRecordMetadata() + .forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value)); + } + return new StreamDataDecoderResult(row, null); + } else { + return new StreamDataDecoderResult(null, + new RuntimeException("Encountered unknown exception when decoding a Stream message")); + } + } catch (Exception e) { + LOGGER.error("Failed to decode StreamMessage", e); + return new StreamDataDecoderResult(null, e); + } + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/MessageAndOffsetAndMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java similarity index 55% rename from pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/MessageAndOffsetAndMetadata.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java index b83920dbf7..b0f9a32673 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/MessageAndOffsetAndMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java @@ -16,27 +16,32 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.plugin.stream.kafka20; +package org.apache.pinot.spi.stream; -import java.nio.ByteBuffer; -import org.apache.pinot.plugin.stream.kafka.MessageAndOffset; -import org.apache.pinot.spi.stream.RowMetadata; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; -public class MessageAndOffsetAndMetadata extends MessageAndOffset { - private final RowMetadata _rowMetadata; +/** + * A container class for holding the result of a decoder + * At any point in time, only one of Result or exception is set as null. + */ +public final class StreamDataDecoderResult { + private final GenericRow _result; + private final Exception _exception; - public MessageAndOffsetAndMetadata(byte[] message, long offset, RowMetadata rowMetadata) { - super(message, offset); - _rowMetadata = rowMetadata; + public StreamDataDecoderResult(GenericRow result, Exception exception) { + _result = result; + _exception = exception; } - public MessageAndOffsetAndMetadata(ByteBuffer message, long offset, RowMetadata rowMetadata) { - super(message, offset); - _rowMetadata = rowMetadata; + @Nullable + public GenericRow getResult() { + return _result; } - public RowMetadata getRowMetadata() { - return _rowMetadata; + @Nullable + public Exception getException() { + return _exception; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java index d181c44225..672c7f200f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; /** @@ -34,6 +35,10 @@ public interface StreamDataProducer { void produce(String topic, byte[] key, byte[] payload); + default void produce(String topic, byte[] key, byte[] payload, GenericRow headers) { + produce(topic, key, payload); + } + void close(); /** @@ -43,7 +48,7 @@ public interface StreamDataProducer { * @param rows the rows */ default void produceBatch(String topic, List<byte[]> rows) { - for (byte[] row: rows) { + for (byte[] row : rows) { produce(topic, row); } } @@ -54,20 +59,30 @@ public interface StreamDataProducer { * @param topic the topic of the output * @param payloadWithKey the payload rows with key */ - default void produceKeyedBatch(String topic, List<RowWithKey> payloadWithKey) { - for (RowWithKey rowWithKey: payloadWithKey) { + default void produceKeyedBatch(String topic, List<RowWithKey> payloadWithKey, boolean includeHeaders) { + for (RowWithKey rowWithKey : payloadWithKey) { if (rowWithKey.getKey() == null) { produce(topic, rowWithKey.getPayload()); } else { - produce(topic, rowWithKey.getKey(), rowWithKey.getPayload()); + if (includeHeaders) { + GenericRow header = new GenericRow(); + header.putValue("header1", System.currentTimeMillis()); + produce(topic, rowWithKey.getKey(), rowWithKey.getPayload(), header); + } else { + produce(topic, rowWithKey.getKey(), rowWithKey.getPayload()); + } } } } - /** - * Helper class so the key and payload can be easily tied together instead of using a pair - * The class is intended for StreamDataProducer only - */ + default void produceKeyedBatch(String topic, List<RowWithKey> payloadWithKey) { + produceKeyedBatch(topic, payloadWithKey, false); + } + + /** + * Helper class so the key and payload can be easily tied together instead of using a pair + * The class is intended for StreamDataProducer only + */ class RowWithKey { private final byte[] _key; private final byte[] _payload; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java new file mode 100644 index 0000000000..6eaf099c12 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import javax.annotation.Nullable; + + +/** + * Represents a Stream message which includes the following components: + * 1. record key (optional) + * 2. record value (required) + * 3. StreamMessageMetadata (optional) - encapsulates record headers and metadata associated with a stream message + * (such as a message identifier, publish timestamp, user-provided headers etc) + * + * Similar to value decoder, each implementing stream plugin can have a key decoder and header extractor. + * If the key and header extractions are enabled for the table, the schema will automatically contain these fields as: + * "__header$HEADER_KEY" or "__metadata$RECORD_TIMESTAMP" + * + * These columns can be treated similar to any other Pinot table column. + * + * Usability note: In order to achieve this, table configuration should enable "populate metadata" option. + * Additionally, the pinot table schema should refer these fields. Otherwise, even though the fields are extracted, + * they will not materialize in the pinot table. + */ +public class StreamMessage { + private final byte[] _key; + private final byte[] _value; + protected final StreamMessageMetadata _metadata; + + public StreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) { + _key = key; + _value = value; + _metadata = metadata; + } + + public StreamMessage(byte[] value) { + this(null, value, null); + } + + public byte[] getValue() { + return _value; + } + + @Nullable + public StreamMessageMetadata getMetadata() { + return _metadata; + } + + @Nullable + public byte[] getKey() { + return _key; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java index 9991f34eac..2b0860690a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java @@ -18,26 +18,51 @@ */ package org.apache.pinot.spi.stream; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; + + /** * A class that provides metadata associated with the message of a stream, for e.g., - * ingestion-timestamp of the message. + * timestamp derived from the incoming record (not the ingestion time). */ public class StreamMessageMetadata implements RowMetadata { + private final long _recordIngestionTimeMs; + private final GenericRow _headers; + private final Map<String, String> _metadata; - private final long _ingestionTimeMs; + public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers) { + this(recordIngestionTimeMs, headers, Collections.emptyMap()); + } /** * Construct the stream based message/row message metadata * - * @param ingestionTimeMs the time that the message was ingested by the stream provider + * @param recordIngestionTimeMs the time that the message was ingested by the stream provider * use Long.MIN_VALUE if not applicable + * @param metadata */ - public StreamMessageMetadata(long ingestionTimeMs) { - _ingestionTimeMs = ingestionTimeMs; + public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable GenericRow headers, + Map<String, String> metadata) { + _recordIngestionTimeMs = recordIngestionTimeMs; + _headers = headers; + _metadata = metadata; + } + + @Override + public long getRecordIngestionTimeMs() { + return _recordIngestionTimeMs; + } + + @Override + public GenericRow getHeaders() { + return _headers; } @Override - public long getIngestionTimeMs() { - return _ingestionTimeMs; + public Map<String, String> getRecordMetadata() { + return _metadata; } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java new file mode 100644 index 0000000000..0483b7d1ac --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import com.google.common.collect.ImmutableSet; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class StreamDataDecoderImplTest { + private static final String NAME_FIELD = "name"; + private static final String AGE_HEADER_KEY = "age"; + private static final String SEQNO_RECORD_METADATA = "seqNo"; + + @Test + public void testDecodeValueOnly() + throws Exception { + TestDecoder messageDecoder = new TestDecoder(); + messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); + String value = "Alice"; + StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8)); + StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); + Assert.assertNotNull(result); + Assert.assertNull(result.getException()); + Assert.assertNotNull(result.getResult()); + + GenericRow row = result.getResult(); + Assert.assertEquals(row.getFieldToValueMap().size(), 1); + Assert.assertEquals(String.valueOf(row.getValue(NAME_FIELD)), value); + } + + @Test + public void testDecodeKeyAndHeaders() + throws Exception { + TestDecoder messageDecoder = new TestDecoder(); + messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); + String value = "Alice"; + String key = "id-1"; + GenericRow headers = new GenericRow(); + headers.putValue(AGE_HEADER_KEY, 3); + Map<String, String> recordMetadata = Collections.singletonMap(SEQNO_RECORD_METADATA, "1"); + StreamMessageMetadata metadata = new StreamMessageMetadata(1234L, headers, recordMetadata); + StreamMessage message = new StreamMessage(key.getBytes(StandardCharsets.UTF_8), + value.getBytes(StandardCharsets.UTF_8), metadata); + + StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); + Assert.assertNotNull(result); + Assert.assertNull(result.getException()); + Assert.assertNotNull(result.getResult()); + + GenericRow row = result.getResult(); + Assert.assertEquals(row.getFieldToValueMap().size(), 4); + Assert.assertEquals(row.getValue(NAME_FIELD), value); + Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed to decode record key"); + Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX + AGE_HEADER_KEY), 3); + Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX + SEQNO_RECORD_METADATA), "1"); + } + + @Test + public void testNoExceptionIsThrown() + throws Exception { + ThrowingDecoder messageDecoder = new ThrowingDecoder(); + messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), ""); + String value = "Alice"; + StreamMessage message = new StreamMessage(value.getBytes(StandardCharsets.UTF_8)); + StreamDataDecoderResult result = new StreamDataDecoderImpl(messageDecoder).decode(message); + Assert.assertNotNull(result); + Assert.assertNotNull(result.getException()); + Assert.assertNull(result.getResult()); + } + + class ThrowingDecoder implements StreamMessageDecoder<byte[]> { + + @Override + public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) + throws Exception { } + + @Nullable + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + throw new RuntimeException("something failed during decoding"); + } + + @Nullable + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(payload, destination); + } + } + + + class TestDecoder implements StreamMessageDecoder<byte[]> { + @Override + public void init(Map<String, String> props, Set<String> fieldsToRead, String topicName) + throws Exception { } + + @Nullable + @Override + public GenericRow decode(byte[] payload, GenericRow destination) { + destination.putValue(NAME_FIELD, new String(payload, StandardCharsets.UTF_8)); + return destination; + } + + @Nullable + @Override + public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { + return decode(payload, destination); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java similarity index 51% copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java copy to pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java index 9991f34eac..926761109b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java @@ -18,26 +18,26 @@ */ package org.apache.pinot.spi.stream; -/** - * A class that provides metadata associated with the message of a stream, for e.g., - * ingestion-timestamp of the message. - */ -public class StreamMessageMetadata implements RowMetadata { +import java.nio.charset.StandardCharsets; +import org.testng.Assert; +import org.testng.annotations.Test; - private final long _ingestionTimeMs; - /** - * Construct the stream based message/row message metadata - * - * @param ingestionTimeMs the time that the message was ingested by the stream provider - * use Long.MIN_VALUE if not applicable - */ - public StreamMessageMetadata(long ingestionTimeMs) { - _ingestionTimeMs = ingestionTimeMs; - } +public class StreamMessageTest { + + @Test + public void testAllowNullKeyAndMetadata() { + StreamMessage msg = new StreamMessage("hello".getBytes(StandardCharsets.UTF_8)); + Assert.assertNull(msg.getKey()); + Assert.assertNull(msg.getMetadata()); + Assert.assertEquals(new String(msg.getValue()), "hello"); - @Override - public long getIngestionTimeMs() { - return _ingestionTimeMs; + StreamMessage msg1 = new StreamMessage("key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8), null); + Assert.assertNotNull(msg1.getKey()); + Assert.assertEquals(new String(msg1.getKey()), "key"); + Assert.assertNotNull(msg1.getValue()); + Assert.assertEquals(new String(msg1.getValue()), "value"); + Assert.assertNull(msg1.getMetadata()); } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java index 7078168965..93267a394e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java @@ -399,7 +399,7 @@ public abstract class QuickStartBase { case "meetupRsvp": kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(10)); printStatus(Quickstart.Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****"); - MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(); + MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true); meetupRSVPProvider.run(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java index 87c527f9e1..362ab88daf 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java @@ -96,7 +96,7 @@ public class PinotRealtimeSource implements AutoCloseable { } else { _rateLimiter.acquire(rows.size()); if (!_shutdown) { - _producer.produceKeyedBatch(_topicName, rows); + _producer.produceKeyedBatch(_topicName, rows, true); } } } diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json index 41379c2569..0a85aaf577 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json @@ -38,7 +38,8 @@ "stream.kafka.broker.list": "localhost:19092", "stream.kafka.consumer.prop.auto.offset.reset": "largest", "realtime.segment.flush.threshold.time": "12h", - "realtime.segment.flush.threshold.size": "100M" + "realtime.segment.flush.threshold.size": "10K", + "stream.kafka.metadata.populate": "true" } }, "metadata": { diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json index 8109f56d40..93deb2d137 100644 --- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json +++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json @@ -6,6 +6,14 @@ } ], "dimensionFieldSpecs": [ + { + "dataType": "STRING", + "name": "__key" + }, + { + "dataType": "STRING", + "name": "header$producerTimestamp" + }, { "dataType": "STRING", "name": "venue_name" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org