jackluo923 commented on code in PR #14496:
URL: https://github.com/apache/pinot/pull/14496#discussion_r1850974494
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -150,8 +150,8 @@ private BytesStreamMessage extractStreamMessage(Record
record, String shardId) {
String sequenceNumber = record.sequenceNumber();
KinesisPartitionGroupOffset offset = new
KinesisPartitionGroupOffset(shardId, sequenceNumber);
// NOTE: Use the same offset as next offset because the consumer starts
consuming AFTER the start sequence number.
- StreamMessageMetadata.Builder builder =
- new
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setOffset(offset,
offset);
+ StreamMessageMetadata.Builder builder = new
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
+
.setSerializedValueSize(record.data().asByteArray().length).setOffset(offset,
offset);
Review Comment:
`record.data().asByteArray().length` does not require additional null
checks.
Several lines above, there's this line of source code:
```
byte[] value = record.data().asByteArray();
```
I have modified the change to
```
.setSerializedValueSize(value.length)
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java:
##########
@@ -84,7 +84,7 @@ static StreamMessageMetadata
extractMessageMetadata(Message<byte[]> message, Pul
MessageIdStreamOffset nextOffset = new
MessageIdStreamOffset(getNextMessageId(messageId));
StreamMessageMetadata.Builder builder =
new
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(recordIngestionTimeMs)
- .setOffset(offset, nextOffset);
+ .setOffset(offset,
nextOffset).setSerializedValueSize(message.size());
Review Comment:
The `message` variable is definetly not `null` because it is used several
times because it is called in the code change.
```
@VisibleForTesting
static StreamMessageMetadata extractMessageMetadata(Message<byte[]>
message, PulsarConfig config) {
long recordIngestionTimeMs =
message.getBrokerPublishTime().orElse(message.getPublishTime());
MessageId messageId = message.getMessageId();
MessageIdStreamOffset offset = new MessageIdStreamOffset(messageId);
MessageIdStreamOffset nextOffset = new
MessageIdStreamOffset(getNextMessageId(messageId));
StreamMessageMetadata.Builder builder =
new
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(recordIngestionTimeMs)
.setOffset(offset,
nextOffset).setSerializedValueSize(message.size());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]