Jackie-Jiang commented on code in PR #13015:
URL: https://github.com/apache/pinot/pull/13015#discussion_r1609266310


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -926,14 +923,14 @@ public Map<String, PartitionLagState> 
getPartitionToLagState(
    * batchFirstOffset should be less than or equal to startOffset.
    * If batchFirstOffset is greater, then some messages were not received.
    *
-   * @param startOffset The offset of the first message desired, inclusive.
-   * @param batchFirstOffset The offset of the first message in the batch.
+   * @param messageBatch Message batch to validate
    */
-  private void validateStartOffset(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset batchFirstOffset) {
-    if (batchFirstOffset.compareTo(startOffset) > 0) {
+  private void reportDataLoss(MessageBatch messageBatch) {

Review Comment:
   Please update the java doc for this method



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -926,14 +923,14 @@ public Map<String, PartitionLagState> 
getPartitionToLagState(
    * batchFirstOffset should be less than or equal to startOffset.
    * If batchFirstOffset is greater, then some messages were not received.
    *
-   * @param startOffset The offset of the first message desired, inclusive.
-   * @param batchFirstOffset The offset of the first message in the batch.
+   * @param messageBatch Message batch to validate
    */
-  private void validateStartOffset(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset batchFirstOffset) {
-    if (batchFirstOffset.compareTo(startOffset) > 0) {
+  private void reportDataLoss(MessageBatch messageBatch) {
+    if (messageBatch.hasDataLoss()) {
       _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 1L);
       String message =
-          "startOffset(" + startOffset + ") is older than topic's beginning 
offset(" + batchFirstOffset + ")";
+          "Message loss detected in stream partition: " + _partitionGroupId + 
" for table: " + _tableNameWithType

Review Comment:
   Let's use `String.format()` for readability



##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -66,7 +66,7 @@ public synchronized KinesisMessageBatch 
fetchMessages(StreamPartitionMsgOffset s
     KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset) 
startMsgOffset;
     String shardId = startOffset.getShardId();
     String startSequenceNumber = startOffset.getSequenceNumber();
-
+    boolean isMissingMessages = false;

Review Comment:
   Revert this



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java:
##########
@@ -96,6 +96,10 @@ default boolean isEndOfPartitionGroup() {
     return false;
   }
 
+  default boolean hasDataLoss() {

Review Comment:
   Add some javadoc for this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to