This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 737d443f60 Extract record keys, headers and metadata from Stream 
sources (#9224)
737d443f60 is described below

commit 737d443f601703ce17d1e3b8a27bd853c88cfa57
Author: Navina Ramesh <nav...@apache.org>
AuthorDate: Wed Sep 28 11:04:56 2022 +0530

    Extract record keys, headers and metadata from Stream sources (#9224)
    
    * initial commit from kishore
    
    adding StreamMessage concept and using it with kafka
    
    move StreamMessage into pinot-spi
    
    Use StreamDataDecoder interface for deocding a StreamMessage in 
LLRealtimeSegmentDataManager
    
    verified key and header with realtime quick start
    
    checkstyle
    
    updating realtime quickstart to include headers and key parsing
    
    fix failing tests
    
    checkstyle
    
    revert to continue using hlc in integration tests
    
    added TODO for failing test; revert adding header in the integ test
    
    nit
    
    Deleting unused class: MessageAndOffset and MessageAndOffsetAndMetadata
    
    * Adding metadata to RowMetadata to distinguish it from headers
    Addressing feeback
    
    * clear reuse genericrow
    
    * Making offset as a part of KafkaStreamMessageMetadata; changed 
StreamMessage into a concrete class
    
    * rename RowMetadataExtractor.java to KafkaMetadataExtractor.java
    
    * adding unit test
    addressing PR feedback
    
    * Always populate offset and ingestion time in record time
    
    * update the comment
    
    * rename to recordIngestionTimeMs; Fixed unit test assertion in 
KafkaPartition consumer -  Kafka will always have metadata
    
    * include kafka recordTimestamp in streamMessageMetadata object
    
    * do not use Map.of
---
 .../common/function/scalar/StringFunctions.java    |  32 ++++-
 .../realtime/LLRealtimeSegmentDataManager.java     |  35 +++---
 .../tests/HybridClusterIntegrationTest.java        |   1 +
 .../plugin/stream/kafka20/KafkaMessageBatch.java   |  31 +++--
 .../stream/kafka20/KafkaMetadataExtractor.java     |  57 +++++++++
 .../KafkaPartitionLevelConnectionHandler.java      |   4 +-
 .../kafka20/KafkaPartitionLevelConsumer.java       |  16 ++-
 .../plugin/stream/kafka20/KafkaStreamMessage.java} |  39 ++----
 ...ractor.java => KafkaStreamMessageMetadata.java} |  18 +--
 .../stream/kafka20/server/KafkaDataProducer.java   |  38 ++++++
 .../kafka20/KafkaPartitionLevelConsumerTest.java   |  24 ++--
 .../indexsegment/mutable/MutableSegmentImpl.java   |   2 +-
 .../indexsegment/mutable/IndexingFailureTest.java  |   2 +-
 .../MutableSegmentImplAggregateMetricsTest.java    |   2 +-
 ...MutableSegmentImplIngestionAggregationTest.java |   2 +-
 .../mutable/MutableSegmentImplRawMVTest.java       |   2 +-
 .../mutable/MutableSegmentImplTest.java            |   2 +-
 .../org/apache/pinot/spi/stream/MessageBatch.java  |  15 +++
 .../org/apache/pinot/spi/stream/RowMetadata.java   |  36 +++++-
 ...MessageMetadata.java => StreamDataDecoder.java} |  26 ++--
 .../pinot/spi/stream/StreamDataDecoderImpl.java    |  70 +++++++++++
 .../pinot/spi/stream/StreamDataDecoderResult.java  |  33 +++---
 .../pinot/spi/stream/StreamDataProducer.java       |  31 +++--
 .../org/apache/pinot/spi/stream/StreamMessage.java |  69 +++++++++++
 .../pinot/spi/stream/StreamMessageMetadata.java    |  39 ++++--
 .../spi/stream/StreamDataDecoderImplTest.java      | 132 +++++++++++++++++++++
 .../pinot/spi/stream/StreamMessageTest.java}       |  36 +++---
 .../org/apache/pinot/tools/QuickStartBase.java     |   2 +-
 .../pinot/tools/streams/PinotRealtimeSource.java   |   2 +-
 .../meetupRsvp_realtime_table_config.json          |   3 +-
 .../stream/meetupRsvp/meetupRsvp_schema.json       |   8 ++
 31 files changed, 651 insertions(+), 158 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
index 4277cec152..59862ccd86 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
@@ -384,6 +384,36 @@ public class StringFunctions {
     return new String(result);
   }
 
+  /**
+   * @param bytes
+   * @param charsetName encoding
+   * @return bytearray to string
+   * returns null on exception
+   */
+  @ScalarFunction
+  public static String fromBytes(byte[] bytes, String charsetName) {
+    try {
+      return new String(bytes, charsetName);
+    } catch (UnsupportedEncodingException e) {
+      return null;
+    }
+  }
+
+  /**
+   * @param input
+   * @param charsetName encoding
+   * @return bytearray to string
+   * returns null on exception
+   */
+  @ScalarFunction
+  public static byte[] toBytes(String input, String charsetName) {
+    try {
+      return input.getBytes(charsetName);
+    } catch (UnsupportedEncodingException e) {
+      return null;
+    }
+  }
+
   /**
    * @see StandardCharsets#UTF_8#encode(String)
    * @param input
@@ -556,7 +586,7 @@ public class StringFunctions {
   @ScalarFunction
   public static String encodeUrl(String input)
       throws UnsupportedEncodingException {
-      return URLEncoder.encode(input, StandardCharsets.UTF_8.toString());
+    return URLEncoder.encode(input, StandardCharsets.UTF_8.toString());
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 235bf75ef2..f20a9be217 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -82,6 +82,9 @@ import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamDataDecoder;
+import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
+import org.apache.pinot.spi.stream.StreamDataDecoderResult;
 import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -212,7 +215,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final SegmentZKMetadata _segmentZKMetadata;
   private final TableConfig _tableConfig;
   private final RealtimeTableDataManager _realtimeTableDataManager;
-  private final StreamMessageDecoder _messageDecoder;
+  private final StreamDataDecoder _streamDataDecoder;
   private final int _segmentMaxRowCount;
   private final String _resourceDataDir;
   private final IndexLoadingConfig _indexLoadingConfig;
@@ -506,7 +509,6 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     int streamMessageCount = 0;
     boolean canTakeMore = true;
 
-    GenericRow reuse = new GenericRow();
     TransformPipeline.Result reusedResult = new TransformPipeline.Result();
     boolean prematureExit = false;
     for (int index = 0; index < messageCount; index++) {
@@ -539,18 +541,18 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         throw new RuntimeException("Realtime segment full");
       }
 
-      // Index each message
-      reuse.clear();
-      // retrieve metadata from the message batch if available
-      // this can be overridden by the decoder if there is a better indicator 
in the message payload
-      RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
-
-      GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), 
messagesAndOffsets.getMessageOffsetAtIndex(index),
-              messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
-      if (decodedRow != null) {
+      // Decode message
+      StreamDataDecoderResult decodedRow = 
_streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
+      RowMetadata msgMetadata = 
messagesAndOffsets.getStreamMessage(index).getMetadata();
+      if (decodedRow.getException() != null) {
+        // TODO: based on a config, decide whether the record should be 
silently dropped or stop further consumption on
+        // decode error
+        realtimeRowsDroppedMeter =
+            _serverMetrics.addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                realtimeRowsDroppedMeter);
+      } else {
         try {
-          _transformPipeline.processRow(decodedRow, reusedResult);
+          _transformPipeline.processRow(decodedRow.getResult(), reusedResult);
         } catch (Exception e) {
           _numRowsErrored++;
           // when exception happens we prefer abandoning the whole batch and 
not partially indexing some rows
@@ -585,10 +587,6 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
                 new SegmentErrorInfo(now(), errorMessage, e));
           }
         }
-      } else {
-        realtimeRowsDroppedMeter =
-            _serverMetrics.addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                realtimeRowsDroppedMeter);
       }
 
       _currentOffset = 
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
@@ -1364,7 +1362,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
     // Create message decoder
     Set<String> fieldsToRead = 
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), 
_schema);
-    _messageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
+    StreamMessageDecoder streamMessageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
+    _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
     _clientId = streamTopic + "-" + _partitionGroupId;
 
     _transformPipeline = new TransformPipeline(tableConfig, schema);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index ea705d4929..87963c8e84 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -135,6 +135,7 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
   }
 
+  // TODO: This test fails when using `llc` consumer mode. Needs investigation
   @Test
   public void testSegmentListApi()
       throws Exception {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index 479469db97..4fd01562cc 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -18,16 +18,17 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.stream.StreamMessage;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 
-public class KafkaMessageBatch implements MessageBatch<byte[]> {
-
-  private final List<MessageAndOffsetAndMetadata> _messageList;
+public class KafkaMessageBatch implements MessageBatch<StreamMessage> {
+  private final List<StreamMessage> _messageList;
   private final int _unfilteredMessageCount;
   private final long _lastOffset;
 
@@ -36,7 +37,7 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
    * @param lastOffset the offset of the last message in the batch
    * @param batch the messages, which may be smaller than {@see 
unfilteredMessageCount}
    */
-  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, 
List<MessageAndOffsetAndMetadata> batch) {
+  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, 
List<StreamMessage> batch) {
     _messageList = batch;
     _lastOffset = lastOffset;
     _unfilteredMessageCount = unfilteredMessageCount;
@@ -53,18 +54,18 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
   }
 
   @Override
-  public byte[] getMessageAtIndex(int index) {
-    return _messageList.get(index).getMessage().array();
+  public StreamMessage getMessageAtIndex(int index) {
+    return _messageList.get(index);
   }
 
   @Override
   public int getMessageOffsetAtIndex(int index) {
-    return _messageList.get(index).getMessage().arrayOffset();
+    return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
   }
 
   @Override
   public int getMessageLengthAtIndex(int index) {
-    return _messageList.get(index).payloadSize();
+    return _messageList.get(index).getValue().length;
   }
 
   @Override
@@ -74,7 +75,7 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int 
index) {
-    return new LongMsgOffset(_messageList.get(index).getNextOffset());
+    return new LongMsgOffset(((KafkaStreamMessage) 
_messageList.get(index)).getNextOffset());
   }
 
   @Override
@@ -84,6 +85,16 @@ public class KafkaMessageBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public RowMetadata getMetadataAtIndex(int index) {
-    return _messageList.get(index).getRowMetadata();
+    return _messageList.get(index).getMetadata();
+  }
+
+  @Override
+  public byte[] getMessageBytesAtIndex(int index) {
+    return _messageList.get(index).getValue();
+  }
+
+  @Override
+  public StreamMessage getStreamMessage(int index) {
+    return _messageList.get(index);
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMetadataExtractor.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMetadataExtractor.java
new file mode 100644
index 0000000000..922e165301
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMetadataExtractor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+
+
+@FunctionalInterface
+public interface KafkaMetadataExtractor {
+  static KafkaMetadataExtractor build(boolean populateMetadata) {
+    return record -> {
+      if (!populateMetadata) {
+        long recordTimestamp = record.timestamp();
+        Map<String, String> metadataMap = new HashMap<>();
+        metadataMap.put(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, 
String.valueOf(record.offset()));
+        metadataMap.put(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, 
String.valueOf(recordTimestamp));
+        return new KafkaStreamMessageMetadata(recordTimestamp, 
RowMetadata.EMPTY_ROW, metadataMap);
+      }
+      GenericRow headerGenericRow = new GenericRow();
+      Headers headers = record.headers();
+      if (headers != null) {
+        Header[] headersArray = headers.toArray();
+        for (Header header : headersArray) {
+          headerGenericRow.putValue(header.key(), header.value());
+        }
+      }
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, 
String.valueOf(record.offset()));
+      metadata.put(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, 
String.valueOf(record.timestamp()));
+      return new KafkaStreamMessageMetadata(record.timestamp(), 
headerGenericRow, metadata);
+    };
+  }
+
+  RowMetadata extract(ConsumerRecord<?, ?> consumerRecord);
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index 7c1998dfc8..ca6290d59a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -45,7 +45,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
   protected final String _topic;
   protected final Consumer<String, Bytes> _consumer;
   protected final TopicPartition _topicPartition;
-  protected final RowMetadataExtractor _rowMetadataExtractor;
+  protected final KafkaMetadataExtractor _kafkaMetadataExtractor;
 
   public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig 
streamConfig, int partition) {
     _config = new KafkaPartitionLevelStreamConfig(streamConfig);
@@ -64,7 +64,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
     _consumer = new KafkaConsumer<>(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
-    _rowMetadataExtractor = 
RowMetadataExtractor.build(_config.isPopulateMetadata());
+    _kafkaMetadataExtractor = 
KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
   public void close()
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index bf212cd855..b6b116164f 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,6 +29,8 @@ import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,30 +46,35 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
   }
 
   @Override
-  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset 
startMsgOffset,
+  public MessageBatch<StreamMessage> fetchMessages(StreamPartitionMsgOffset 
startMsgOffset,
       StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, 
int timeoutMillis) {
+  public MessageBatch<StreamMessage> fetchMessages(long startOffset, long 
endOffset, int timeoutMillis) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: 
{}ms", _topicPartition, startOffset,
           endOffset, timeoutMillis);
     }
+    LOGGER.warn("poll consumer: {}, startOffset: {}, endOffset:{} timeout: 
{}ms", _topicPartition, startOffset,
+        endOffset, timeoutMillis);
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
-    List<MessageAndOffsetAndMetadata> filtered = new 
ArrayList<>(messageAndOffsets.size());
+    List<StreamMessage> filtered = new ArrayList<>(messageAndOffsets.size());
     long lastOffset = startOffset;
     for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      String key = messageAndOffset.key();
+      byte[] keyBytes = key == null ? null : 
key.getBytes(StandardCharsets.UTF_8);
       Bytes message = messageAndOffset.value();
       long offset = messageAndOffset.offset();
       if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
         if (message != null) {
+          StreamMessageMetadata rowMetadata = (StreamMessageMetadata) 
_kafkaMetadataExtractor.extract(messageAndOffset);
           filtered.add(
-              new MessageAndOffsetAndMetadata(message.get(), offset, 
_rowMetadataExtractor.extract(messageAndOffset)));
+              new KafkaStreamMessage(keyBytes, message.get(), rowMetadata));
         } else if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("tombstone message at offset {}", offset);
         }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/MessageAndOffset.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
similarity index 58%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/MessageAndOffset.java
rename to 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
index 124ca86fcb..8124bea53d 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/MessageAndOffset.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java
@@ -16,38 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.stream.kafka;
+package org.apache.pinot.plugin.stream.kafka20;
 
-import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
 
 
-public class MessageAndOffset {
-
-  private ByteBuffer _message;
-  private long _offset;
-
-  public MessageAndOffset(byte[] message, long offset) {
-    this(ByteBuffer.wrap(message), offset);
-  }
-
-  public MessageAndOffset(ByteBuffer message, long offset) {
-    _message = message;
-    _offset = offset;
-  }
-
-  public ByteBuffer getMessage() {
-    return _message;
-  }
-
-  public long getOffset() {
-    return _offset;
+public class KafkaStreamMessage extends StreamMessage {
+  public KafkaStreamMessage(@Nullable byte[] key, byte[] value, @Nullable 
StreamMessageMetadata metadata) {
+    super(key, value, metadata);
   }
 
   public long getNextOffset() {
-    return getOffset() + 1;
-  }
-
-  public int payloadSize() {
-    return getMessage().array().length;
+    if (_metadata != null) {
+      long offset = 
Long.parseLong(_metadata.getRecordMetadata().get(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY));
+      return offset < 0 ? -1 : offset + 1;
+    }
+    return -1;
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessageMetadata.java
similarity index 65%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
rename to 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessageMetadata.java
index 81402bb7d5..ada4e22bbb 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/RowMetadataExtractor.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessageMetadata.java
@@ -18,16 +18,18 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.pinot.spi.stream.RowMetadata;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 
 
-@FunctionalInterface
-public interface RowMetadataExtractor {
-  static RowMetadataExtractor build(boolean populateMetadata) {
-    return populateMetadata ? record -> new 
StreamMessageMetadata(record.timestamp()) : record -> null;
-  }
+public class KafkaStreamMessageMetadata extends StreamMessageMetadata {
+  public static final String METADATA_OFFSET_KEY = "offset";
+  public static final String RECORD_TIMESTAMP_KEY = "recordTimestamp";
 
-  RowMetadata extract(ConsumerRecord<?, ?> consumerRecord);
+  public KafkaStreamMessageMetadata(long recordIngestionTimeMs, @Nullable 
GenericRow headers,
+      Map<String, String> metadata) {
+    super(recordIngestionTimeMs, headers, metadata);
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
index dd9790ec92..0bfc7a6cac 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/server/KafkaDataProducer.java
@@ -18,10 +18,17 @@
  */
 package org.apache.pinot.plugin.stream.kafka20.server;
 
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +75,37 @@ public class KafkaDataProducer implements StreamDataProducer 
{
     _producer.flush();
   }
 
+  @Override
+  public void produce(String topic, byte[] key, byte[] payload, GenericRow 
headers) {
+    List<Header> headerList = new ArrayList<>();
+    headerList.add(new RecordHeader("producerTimestamp", 
String.valueOf(System.currentTimeMillis()).getBytes(
+        StandardCharsets.UTF_8)));
+    if (headers != null) {
+      headers.getFieldToValueMap().forEach((k, v) -> {
+        Header header = convertToKafkaHeader(k, v);
+        if (header != null) {
+          headerList.add(header);
+        }
+      });
+    }
+    _producer.send(new ProducerRecord<>(topic, null, key, payload, 
headerList));
+    _producer.flush();
+  }
+
+  public Header convertToKafkaHeader(String key, Object value) {
+    if (value != null) {
+      if (value instanceof String) {
+        return new RecordHeader(key, ((String) 
value).getBytes(StandardCharsets.UTF_8));
+      }
+      if (value instanceof Long) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+        buffer.putLong(((Long) value));
+        return new RecordHeader(key, buffer.array());
+      }
+    }
+    return null;
+  }
+
   @Override
   public void close() {
     _producer.close();
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 5e12f79b9f..0a2f7d2f5f 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -281,26 +281,26 @@ public class KafkaPartitionLevelConsumerTest {
           consumer.fetchMessages(new LongMsgOffset(0), new 
LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
       Assert.assertEquals(batch1.getMessageCount(), 500);
       for (int i = 0; i < batch1.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) batch1.getMessageAtIndex(i);
+        final byte[] msg = batch1.getStreamMessage(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + i);
-        Assert.assertNull(batch1.getMetadataAtIndex(i));
+        Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
       // Test second half batch
       final MessageBatch batch2 =
           consumer.fetchMessages(new LongMsgOffset(500), new 
LongMsgOffset(NUM_MSG_PRODUCED_PER_PARTITION), 10000);
       Assert.assertEquals(batch2.getMessageCount(), 500);
       for (int i = 0; i < batch2.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) batch2.getMessageAtIndex(i);
+        final byte[] msg = batch2.getStreamMessage(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (500 + i));
-        Assert.assertNull(batch1.getMetadataAtIndex(i));
+        Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
       // Some random range
       final MessageBatch batch3 = consumer.fetchMessages(new 
LongMsgOffset(10), new LongMsgOffset(35), 10000);
       Assert.assertEquals(batch3.getMessageCount(), 25);
       for (int i = 0; i < batch3.getMessageCount(); i++) {
-        final byte[] msg = (byte[]) batch3.getMessageAtIndex(i);
+        final byte[] msg = batch3.getStreamMessage(i).getValue();
         Assert.assertEquals(new String(msg), "sample_msg_" + (10 + i));
-        Assert.assertNull(batch1.getMetadataAtIndex(i));
+        Assert.assertNotNull(batch1.getMetadataAtIndex(i));
       }
     }
   }
@@ -342,7 +342,7 @@ public class KafkaPartitionLevelConsumerTest {
       for (int i = 0; i < batch1.getMessageCount(); i++) {
         final RowMetadata metadata = batch1.getMetadataAtIndex(i);
         Assert.assertNotNull(metadata);
-        Assert.assertEquals(metadata.getIngestionTimeMs(), TIMESTAMP + i);
+        Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 
i);
       }
       // Test second half batch
       final MessageBatch batch2 =
@@ -351,7 +351,7 @@ public class KafkaPartitionLevelConsumerTest {
       for (int i = 0; i < batch2.getMessageCount(); i++) {
         final RowMetadata metadata = batch2.getMetadataAtIndex(i);
         Assert.assertNotNull(metadata);
-        Assert.assertEquals(metadata.getIngestionTimeMs(), TIMESTAMP + (500 + 
i));
+        Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 
(500 + i));
       }
       // Some random range
       final MessageBatch batch3 = consumer.fetchMessages(new 
LongMsgOffset(10), new LongMsgOffset(35), 10000);
@@ -359,7 +359,7 @@ public class KafkaPartitionLevelConsumerTest {
       for (int i = 0; i < batch3.getMessageCount(); i++) {
         final RowMetadata metadata = batch3.getMetadataAtIndex(i);
         Assert.assertNotNull(metadata);
-        Assert.assertEquals(metadata.getIngestionTimeMs(), TIMESTAMP + (10 + 
i));
+        Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + 
(10 + i));
       }
     }
   }
@@ -388,7 +388,7 @@ public class KafkaPartitionLevelConsumerTest {
     MessageBatch batch1 = consumer.fetchMessages(new LongMsgOffset(0), new 
LongMsgOffset(400), 10000);
     Assert.assertEquals(batch1.getMessageCount(), 200);
     for (int i = 0; i < batch1.getMessageCount(); i++) {
-      byte[] msg = (byte[]) batch1.getMessageAtIndex(i);
+      byte[] msg = batch1.getStreamMessage(i).getValue();
       Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200));
     }
     Assert.assertEquals(batch1.getOffsetOfNextBatch().toString(), "400");
@@ -400,7 +400,7 @@ public class KafkaPartitionLevelConsumerTest {
     MessageBatch batch3 = consumer.fetchMessages(new LongMsgOffset(201), new 
LongMsgOffset(401), 10000);
     Assert.assertEquals(batch3.getMessageCount(), 200);
     for (int i = 0; i < batch3.getMessageCount(); i++) {
-      byte[] msg = (byte[]) batch3.getMessageAtIndex(i);
+      byte[] msg = batch3.getStreamMessage(i).getValue();
       Assert.assertEquals(new String(msg), "sample_msg_" + (i + 201));
     }
     Assert.assertEquals(batch3.getOffsetOfNextBatch().toString(), "401");
@@ -408,7 +408,7 @@ public class KafkaPartitionLevelConsumerTest {
     MessageBatch batch4 = consumer.fetchMessages(new LongMsgOffset(0), null, 
10000);
     Assert.assertEquals(batch4.getMessageCount(), 500);
     for (int i = 0; i < batch4.getMessageCount(); i++) {
-      byte[] msg = (byte[]) batch4.getMessageAtIndex(i);
+      byte[] msg = batch4.getStreamMessage(i).getValue();
       Assert.assertEquals(new String(msg), "sample_msg_" + (i + 200));
     }
     Assert.assertEquals(batch4.getOffsetOfNextBatch().toString(), "700");
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index d6de8d0168..54d13b7bbe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -543,7 +543,7 @@ public class MutableSegmentImpl implements MutableSegment {
     // Update last indexed time and latest ingestion time
     _lastIndexedTimeMs = System.currentTimeMillis();
     if (rowMetadata != null) {
-      _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, 
rowMetadata.getIngestionTimeMs());
+      _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, 
rowMetadata.getRecordIngestionTimeMs());
     }
 
     return canTakeMore;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
index 07bb1ce5a8..5207884ab0 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/IndexingFailureTest.java
@@ -64,7 +64,7 @@ public class IndexingFailureTest {
   @Test
   public void testIndexingFailures()
       throws IOException {
-    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
     GenericRow goodRow = new GenericRow();
     goodRow.putValue(INT_COL, 0);
     goodRow.putValue(STRING_COL, "a");
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 599c05f8b7..3a164387c2 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -99,7 +99,7 @@ public class MutableSegmentImplAggregateMetricsTest {
 
     Map<String, Long> expectedValues = new HashMap<>();
     Map<String, Float> expectedValuesFloat = new HashMap<>();
-    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
     for (int i = 0; i < NUM_ROWS; i++) {
       int hoursSinceEpoch = random.nextInt(10);
       int daysSinceEpoch = random.nextInt(5);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
index 5c5e2beeec..e0aea45373 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplIngestionAggregationTest.java
@@ -202,7 +202,7 @@ public class MutableSegmentImplIngestionAggregationTest {
 
 
     Random random = new Random(seed);
-    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis(), new GenericRow());
 
     for (int i = 0; i < NUM_ROWS; i++) {
       GenericRow row = getRow(random);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
index 51e9e094d9..17599453b5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
@@ -103,7 +103,7 @@ public class MutableSegmentImplRawMVTest {
         .createMutableSegmentImpl(_schema, noDictionaryColumns, 
Collections.emptySet(), Collections.emptySet(),
             false);
     _lastIngestionTimeMs = System.currentTimeMillis();
-    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs);
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs, new GenericRow());
     _startTimeMs = System.currentTimeMillis();
 
     try (RecordReader recordReader = RecordReaderFactory
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
index 38ae2c5f20..5a58c2dddc 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
@@ -87,7 +87,7 @@ public class MutableSegmentImplTest {
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
             false);
     _lastIngestionTimeMs = System.currentTimeMillis();
-    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs);
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs, new GenericRow());
     _startTimeMs = System.currentTimeMillis();
 
     try (RecordReader recordReader = RecordReaderFactory
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 4cebfb8a4a..0e80df0234 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -47,8 +47,23 @@ public interface MessageBatch<T> {
    * @param index
    * @return
    */
+  @Deprecated
   T getMessageAtIndex(int index);
 
+  // for backward-compatibility
+  default byte[] getMessageBytesAtIndex(int index) {
+    return (byte[]) getMessageAtIndex(index);
+  }
+
+  default StreamMessage getStreamMessage(int index) {
+    return new LegacyStreamMessage(getMessageBytesAtIndex(index));
+  }
+
+  class LegacyStreamMessage extends StreamMessage {
+    public LegacyStreamMessage(byte[] value) {
+      super(value);
+    }
+  }
   /**
    * Returns the offset of the message at a particular index inside a set of 
messages returned from the stream.
    * @param index
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index a8b0f2116f..4c4f17792e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.Collections;
+import java.util.Map;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
@@ -31,13 +34,40 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RowMetadata {
+  GenericRow EMPTY_ROW = new GenericRow();
+  Map<String, String> EMPTY_COLLECTION = Collections.emptyMap();
 
   /**
-   * Return the timestamp associated with when the row was ingested upstream.
-   * Expected to be mainly used for stream-based sources.
+   * Returns the timestamp associated with the record. This typically refers 
to the time it was ingested into the
+   * upstream source. In some cases, it may be the time at which the record 
was created, aka event time (eg. in kafka,
+   * a topic may be configured to use record `CreateTime` instead of 
`LogAppendTime`).
+   *
+   * Expected to be used for stream-based sources.
    *
    * @return timestamp (epoch in milliseconds) when the row was ingested 
upstream
    *         Long.MIN_VALUE if not available
    */
-  long getIngestionTimeMs();
+  long getRecordIngestionTimeMs();
+
+  /**
+   * Returns the stream message headers
+   *
+   * @return A {@link GenericRow} that encapsulates the headers in the 
ingested row
+   */
+  default GenericRow getHeaders() {
+    EMPTY_ROW.clear();
+    return EMPTY_ROW;
+  }
+
+  /**
+   * Returns the metadata associated with the stream record
+   *
+   * Kafka's record offset would be an example of a metadata associated with 
the record. Record metadata is typically
+   * stream specific and hence, it is defined as a Map of strings.
+   *
+   * @return A Map of record metadata entries.
+   */
+  default Map<String, String> getRecordMetadata() {
+    return EMPTY_COLLECTION;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoder.java
similarity index 58%
copy from 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
copy to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoder.java
index 9991f34eac..570197fe7c 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoder.java
@@ -19,25 +19,17 @@
 package org.apache.pinot.spi.stream;
 
 /**
- * A class that provides metadata associated with the message of a stream, for 
e.g.,
- * ingestion-timestamp of the message.
+ * A decoder for {@link StreamMessage}
  */
-public class StreamMessageMetadata implements RowMetadata {
-
-  private final long _ingestionTimeMs;
-
+public interface StreamDataDecoder {
   /**
-   * Construct the stream based message/row message metadata
+   * Decodes a {@link StreamMessage}
    *
-   * @param ingestionTimeMs  the time that the message was ingested by the 
stream provider
-   *                         use Long.MIN_VALUE if not applicable
+   * Please note that the expectation is that the implementations of this 
class should never throw an exception.
+   * Instead, it should encapsulate the exception within the {@link 
StreamDataDecoderResult} object.
+   *
+   * @param message {@link StreamMessage} that contains the data payload and 
optionally, a key and row metadata
+   * @return {@link StreamDataDecoderResult} that either contains the decoded 
row or the exception
    */
-  public StreamMessageMetadata(long ingestionTimeMs) {
-    _ingestionTimeMs = ingestionTimeMs;
-  }
-
-  @Override
-  public long getIngestionTimeMs() {
-    return _ingestionTimeMs;
-  }
+  StreamDataDecoderResult decode(StreamMessage message);
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
new file mode 100644
index 0000000000..3e69dbca03
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StreamDataDecoderImpl implements StreamDataDecoder {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamDataDecoderImpl.class);
+
+  public static final String KEY = "__key";
+  public static final String HEADER_KEY_PREFIX = "__header$";
+  public static final String METADATA_KEY_PREFIX = "__metadata$";
+
+  private final StreamMessageDecoder _valueDecoder;
+  private final GenericRow _reuse = new GenericRow();
+
+  public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) {
+    _valueDecoder = valueDecoder;
+  }
+
+  @Override
+  public StreamDataDecoderResult decode(StreamMessage message) {
+    assert message.getValue() != null;
+
+    try {
+      _reuse.clear();
+      GenericRow row = _valueDecoder.decode(message.getValue(), 0, 
message.getValue().length, _reuse);
+      if (row != null) {
+        if (message.getKey() != null) {
+          row.putValue(KEY, new String(message.getKey(), 
StandardCharsets.UTF_8));
+        }
+        RowMetadata metadata = message.getMetadata();
+        if (metadata != null) {
+          metadata.getHeaders().getFieldToValueMap()
+              .forEach((key, value) -> row.putValue(HEADER_KEY_PREFIX + key, 
value));
+
+          metadata.getRecordMetadata()
+                  .forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + 
key, value));
+        }
+        return new StreamDataDecoderResult(row, null);
+      } else {
+        return new StreamDataDecoderResult(null,
+            new RuntimeException("Encountered unknown exception when decoding 
a Stream message"));
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to decode StreamMessage", e);
+      return new StreamDataDecoderResult(null, e);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/MessageAndOffsetAndMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java
similarity index 55%
rename from 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/MessageAndOffsetAndMetadata.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java
index b83920dbf7..b0f9a32673 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/MessageAndOffsetAndMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java
@@ -16,27 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.stream.kafka20;
+package org.apache.pinot.spi.stream;
 
-import java.nio.ByteBuffer;
-import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
-import org.apache.pinot.spi.stream.RowMetadata;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
-public class MessageAndOffsetAndMetadata extends MessageAndOffset {
-  private final RowMetadata _rowMetadata;
+/**
+ * A container class for holding the result of a decoder
+ * At any point in time, only one of Result or exception is set as null.
+ */
+public final class StreamDataDecoderResult {
+  private final GenericRow _result;
+  private final Exception _exception;
 
-  public MessageAndOffsetAndMetadata(byte[] message, long offset, RowMetadata 
rowMetadata) {
-    super(message, offset);
-    _rowMetadata = rowMetadata;
+  public StreamDataDecoderResult(GenericRow result, Exception exception) {
+    _result = result;
+    _exception = exception;
   }
 
-  public MessageAndOffsetAndMetadata(ByteBuffer message, long offset, 
RowMetadata rowMetadata) {
-    super(message, offset);
-    _rowMetadata = rowMetadata;
+  @Nullable
+  public GenericRow getResult() {
+    return _result;
   }
 
-  public RowMetadata getRowMetadata() {
-    return _rowMetadata;
+  @Nullable
+  public Exception getException() {
+    return _exception;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
index d181c44225..672c7f200f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
@@ -34,6 +35,10 @@ public interface StreamDataProducer {
 
   void produce(String topic, byte[] key, byte[] payload);
 
+  default void produce(String topic, byte[] key, byte[] payload, GenericRow 
headers) {
+    produce(topic, key, payload);
+  }
+
   void close();
 
   /**
@@ -43,7 +48,7 @@ public interface StreamDataProducer {
    * @param rows the rows
    */
   default void produceBatch(String topic, List<byte[]> rows) {
-    for (byte[] row: rows) {
+    for (byte[] row : rows) {
       produce(topic, row);
     }
   }
@@ -54,20 +59,30 @@ public interface StreamDataProducer {
    * @param topic the topic of the output
    * @param payloadWithKey the payload rows with key
    */
-  default void produceKeyedBatch(String topic, List<RowWithKey> 
payloadWithKey) {
-    for (RowWithKey rowWithKey: payloadWithKey) {
+  default void produceKeyedBatch(String topic, List<RowWithKey> 
payloadWithKey, boolean includeHeaders) {
+    for (RowWithKey rowWithKey : payloadWithKey) {
       if (rowWithKey.getKey() == null) {
         produce(topic, rowWithKey.getPayload());
       } else {
-        produce(topic, rowWithKey.getKey(), rowWithKey.getPayload());
+        if (includeHeaders) {
+          GenericRow header = new GenericRow();
+          header.putValue("header1", System.currentTimeMillis());
+          produce(topic, rowWithKey.getKey(), rowWithKey.getPayload(), header);
+        } else {
+          produce(topic, rowWithKey.getKey(), rowWithKey.getPayload());
+        }
       }
     }
   }
 
-  /**
-   * Helper class so the key and payload can be easily tied together instead 
of using a pair
-   * The class is intended for StreamDataProducer only
-   */
+  default void produceKeyedBatch(String topic, List<RowWithKey> 
payloadWithKey) {
+    produceKeyedBatch(topic, payloadWithKey, false);
+  }
+
+    /**
+     * Helper class so the key and payload can be easily tied together instead 
of using a pair
+     * The class is intended for StreamDataProducer only
+     */
   class RowWithKey {
     private final byte[] _key;
     private final byte[] _payload;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
new file mode 100644
index 0000000000..6eaf099c12
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessage.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Represents a Stream message which includes the following components:
+ * 1. record key (optional)
+ * 2. record value (required)
+ * 3. StreamMessageMetadata (optional) - encapsulates record headers and 
metadata associated with a stream message
+ *  (such as a message identifier, publish timestamp, user-provided headers 
etc)
+ *
+ * Similar to value decoder, each implementing stream plugin can have a key 
decoder and header extractor.
+ * If the key and header extractions are enabled for the table, the schema 
will automatically contain these fields as:
+ * "__header$HEADER_KEY" or "__metadata$RECORD_TIMESTAMP"
+ *
+ * These columns can be treated similar to any other Pinot table column.
+ *
+ * Usability note: In order to achieve this, table configuration should enable 
"populate metadata" option.
+ * Additionally, the pinot table schema should refer these fields. Otherwise, 
even though the fields are extracted,
+ * they will not materialize in the pinot table.
+ */
+public class StreamMessage {
+  private final byte[] _key;
+  private final byte[] _value;
+  protected final StreamMessageMetadata _metadata;
+
+  public StreamMessage(@Nullable byte[] key, byte[] value, @Nullable 
StreamMessageMetadata metadata) {
+    _key = key;
+    _value = value;
+    _metadata = metadata;
+  }
+
+  public StreamMessage(byte[] value) {
+    this(null, value, null);
+  }
+
+  public byte[] getValue() {
+    return _value;
+  }
+
+  @Nullable
+  public StreamMessageMetadata getMetadata() {
+    return _metadata;
+  }
+
+  @Nullable
+  public byte[] getKey() {
+    return _key;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index 9991f34eac..2b0860690a 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -18,26 +18,51 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
 /**
  * A class that provides metadata associated with the message of a stream, for 
e.g.,
- * ingestion-timestamp of the message.
+ * timestamp derived from the incoming record (not the ingestion time).
  */
 public class StreamMessageMetadata implements RowMetadata {
+  private final long _recordIngestionTimeMs;
+  private final GenericRow _headers;
+  private final Map<String, String> _metadata;
 
-  private final long _ingestionTimeMs;
+  public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable 
GenericRow headers) {
+    this(recordIngestionTimeMs, headers, Collections.emptyMap());
+  }
 
   /**
    * Construct the stream based message/row message metadata
    *
-   * @param ingestionTimeMs  the time that the message was ingested by the 
stream provider
+   * @param recordIngestionTimeMs  the time that the message was ingested by 
the stream provider
    *                         use Long.MIN_VALUE if not applicable
+   * @param metadata
    */
-  public StreamMessageMetadata(long ingestionTimeMs) {
-    _ingestionTimeMs = ingestionTimeMs;
+  public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable 
GenericRow headers,
+      Map<String, String> metadata) {
+    _recordIngestionTimeMs = recordIngestionTimeMs;
+    _headers = headers;
+    _metadata = metadata;
+  }
+
+  @Override
+  public long getRecordIngestionTimeMs() {
+    return _recordIngestionTimeMs;
+  }
+
+  @Override
+  public GenericRow getHeaders() {
+    return _headers;
   }
 
   @Override
-  public long getIngestionTimeMs() {
-    return _ingestionTimeMs;
+  public Map<String, String> getRecordMetadata() {
+    return _metadata;
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
new file mode 100644
index 0000000000..0483b7d1ac
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import com.google.common.collect.ImmutableSet;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class StreamDataDecoderImplTest {
+  private static final String NAME_FIELD = "name";
+  private static final String AGE_HEADER_KEY = "age";
+  private static final String SEQNO_RECORD_METADATA = "seqNo";
+
+  @Test
+  public void testDecodeValueOnly()
+      throws Exception {
+    TestDecoder messageDecoder = new TestDecoder();
+    messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), 
"");
+    String value = "Alice";
+    StreamMessage message = new 
StreamMessage(value.getBytes(StandardCharsets.UTF_8));
+    StreamDataDecoderResult result = new 
StreamDataDecoderImpl(messageDecoder).decode(message);
+    Assert.assertNotNull(result);
+    Assert.assertNull(result.getException());
+    Assert.assertNotNull(result.getResult());
+
+    GenericRow row = result.getResult();
+    Assert.assertEquals(row.getFieldToValueMap().size(), 1);
+    Assert.assertEquals(String.valueOf(row.getValue(NAME_FIELD)), value);
+  }
+
+  @Test
+  public void testDecodeKeyAndHeaders()
+      throws Exception {
+    TestDecoder messageDecoder = new TestDecoder();
+    messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), 
"");
+    String value = "Alice";
+    String key = "id-1";
+    GenericRow headers = new GenericRow();
+    headers.putValue(AGE_HEADER_KEY, 3);
+    Map<String, String> recordMetadata = 
Collections.singletonMap(SEQNO_RECORD_METADATA, "1");
+    StreamMessageMetadata metadata = new StreamMessageMetadata(1234L, headers, 
recordMetadata);
+    StreamMessage message = new 
StreamMessage(key.getBytes(StandardCharsets.UTF_8),
+        value.getBytes(StandardCharsets.UTF_8), metadata);
+
+    StreamDataDecoderResult result = new 
StreamDataDecoderImpl(messageDecoder).decode(message);
+    Assert.assertNotNull(result);
+    Assert.assertNull(result.getException());
+    Assert.assertNotNull(result.getResult());
+
+    GenericRow row = result.getResult();
+    Assert.assertEquals(row.getFieldToValueMap().size(), 4);
+    Assert.assertEquals(row.getValue(NAME_FIELD), value);
+    Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed 
to decode record key");
+    Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX + 
AGE_HEADER_KEY), 3);
+    Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX 
+ SEQNO_RECORD_METADATA), "1");
+  }
+
+  @Test
+  public void testNoExceptionIsThrown()
+      throws Exception {
+    ThrowingDecoder messageDecoder = new ThrowingDecoder();
+    messageDecoder.init(Collections.emptyMap(), ImmutableSet.of(NAME_FIELD), 
"");
+    String value = "Alice";
+    StreamMessage message = new 
StreamMessage(value.getBytes(StandardCharsets.UTF_8));
+    StreamDataDecoderResult result = new 
StreamDataDecoderImpl(messageDecoder).decode(message);
+    Assert.assertNotNull(result);
+    Assert.assertNotNull(result.getException());
+    Assert.assertNull(result.getResult());
+  }
+
+  class ThrowingDecoder implements StreamMessageDecoder<byte[]> {
+
+    @Override
+    public void init(Map<String, String> props, Set<String> fieldsToRead, 
String topicName)
+        throws Exception { }
+
+    @Nullable
+    @Override
+    public GenericRow decode(byte[] payload, GenericRow destination) {
+      throw new RuntimeException("something failed during decoding");
+    }
+
+    @Nullable
+    @Override
+    public GenericRow decode(byte[] payload, int offset, int length, 
GenericRow destination) {
+      return decode(payload, destination);
+    }
+  }
+
+
+  class TestDecoder implements StreamMessageDecoder<byte[]> {
+    @Override
+    public void init(Map<String, String> props, Set<String> fieldsToRead, 
String topicName)
+        throws Exception { }
+
+    @Nullable
+    @Override
+    public GenericRow decode(byte[] payload, GenericRow destination) {
+      destination.putValue(NAME_FIELD, new String(payload, 
StandardCharsets.UTF_8));
+      return destination;
+    }
+
+    @Nullable
+    @Override
+    public GenericRow decode(byte[] payload, int offset, int length, 
GenericRow destination) {
+      return decode(payload, destination);
+    }
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
 b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
similarity index 51%
copy from 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
copy to 
pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
index 9991f34eac..926761109b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMessageTest.java
@@ -18,26 +18,26 @@
  */
 package org.apache.pinot.spi.stream;
 
-/**
- * A class that provides metadata associated with the message of a stream, for 
e.g.,
- * ingestion-timestamp of the message.
- */
-public class StreamMessageMetadata implements RowMetadata {
+import java.nio.charset.StandardCharsets;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
-  private final long _ingestionTimeMs;
 
-  /**
-   * Construct the stream based message/row message metadata
-   *
-   * @param ingestionTimeMs  the time that the message was ingested by the 
stream provider
-   *                         use Long.MIN_VALUE if not applicable
-   */
-  public StreamMessageMetadata(long ingestionTimeMs) {
-    _ingestionTimeMs = ingestionTimeMs;
-  }
+public class StreamMessageTest {
+
+  @Test
+  public void testAllowNullKeyAndMetadata() {
+    StreamMessage msg = new 
StreamMessage("hello".getBytes(StandardCharsets.UTF_8));
+    Assert.assertNull(msg.getKey());
+    Assert.assertNull(msg.getMetadata());
+    Assert.assertEquals(new String(msg.getValue()), "hello");
 
-  @Override
-  public long getIngestionTimeMs() {
-    return _ingestionTimeMs;
+    StreamMessage msg1 = new 
StreamMessage("key".getBytes(StandardCharsets.UTF_8),
+        "value".getBytes(StandardCharsets.UTF_8), null);
+    Assert.assertNotNull(msg1.getKey());
+    Assert.assertEquals(new String(msg1.getKey()), "key");
+    Assert.assertNotNull(msg1.getValue());
+    Assert.assertEquals(new String(msg1.getValue()), "value");
+    Assert.assertNull(msg1.getMetadata());
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
index 7078168965..93267a394e 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java
@@ -399,7 +399,7 @@ public abstract class QuickStartBase {
         case "meetupRsvp":
           kafkaStarter.createTopic("meetupRSVPEvents", 
KafkaStarterUtils.getTopicCreationProps(10));
           printStatus(Quickstart.Color.CYAN, "***** Starting meetup data 
stream and publishing to Kafka *****");
-          MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
+          MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(true);
           meetupRSVPProvider.run();
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             try {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
index 87c527f9e1..362ab88daf 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
@@ -96,7 +96,7 @@ public class PinotRealtimeSource implements AutoCloseable {
         } else {
           _rateLimiter.acquire(rows.size());
           if (!_shutdown) {
-            _producer.produceKeyedBatch(_topicName, rows);
+            _producer.produceKeyedBatch(_topicName, rows, true);
           }
         }
       }
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
index 41379c2569..0a85aaf577 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_realtime_table_config.json
@@ -38,7 +38,8 @@
       "stream.kafka.broker.list": "localhost:19092",
       "stream.kafka.consumer.prop.auto.offset.reset": "largest",
       "realtime.segment.flush.threshold.time": "12h",
-      "realtime.segment.flush.threshold.size": "100M"
+      "realtime.segment.flush.threshold.size": "10K",
+      "stream.kafka.metadata.populate": "true"
     }
   },
   "metadata": {
diff --git 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
index 8109f56d40..93deb2d137 100644
--- 
a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
+++ 
b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
@@ -6,6 +6,14 @@
     }
   ],
   "dimensionFieldSpecs": [
+    {
+      "dataType": "STRING",
+      "name": "__key"
+    },
+    {
+      "dataType": "STRING",
+      "name": "header$producerTimestamp"
+    },
     {
       "dataType": "STRING",
       "name": "venue_name"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to