Jackie-Jiang commented on code in PR #13015: URL: https://github.com/apache/pinot/pull/13015#discussion_r1609266310
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -926,14 +923,14 @@ public Map<String, PartitionLagState> getPartitionToLagState( * batchFirstOffset should be less than or equal to startOffset. * If batchFirstOffset is greater, then some messages were not received. * - * @param startOffset The offset of the first message desired, inclusive. - * @param batchFirstOffset The offset of the first message in the batch. + * @param messageBatch Message batch to validate */ - private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) { - if (batchFirstOffset.compareTo(startOffset) > 0) { + private void reportDataLoss(MessageBatch messageBatch) { Review Comment: Please update the java doc for this method ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -926,14 +923,14 @@ public Map<String, PartitionLagState> getPartitionToLagState( * batchFirstOffset should be less than or equal to startOffset. * If batchFirstOffset is greater, then some messages were not received. * - * @param startOffset The offset of the first message desired, inclusive. - * @param batchFirstOffset The offset of the first message in the batch. + * @param messageBatch Message batch to validate */ - private void validateStartOffset(StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset batchFirstOffset) { - if (batchFirstOffset.compareTo(startOffset) > 0) { + private void reportDataLoss(MessageBatch messageBatch) { + if (messageBatch.hasDataLoss()) { _serverMetrics.addMeteredTableValue(_tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L); String message = - "startOffset(" + startOffset + ") is older than topic's beginning offset(" + batchFirstOffset + ")"; + "Message loss detected in stream partition: " + _partitionGroupId + " for table: " + _tableNameWithType Review Comment: Let's use `String.format()` for readability ########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -66,7 +66,7 @@ public synchronized KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset s KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) startMsgOffset; String shardId = startOffset.getShardId(); String startSequenceNumber = startOffset.getSequenceNumber(); - + boolean isMissingMessages = false; Review Comment: Revert this ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java: ########## @@ -96,6 +96,10 @@ default boolean isEndOfPartitionGroup() { return false; } + default boolean hasDataLoss() { Review Comment: Add some javadoc for this method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org