navina commented on code in PR #9994:
URL: https://github.com/apache/pinot/pull/9994#discussion_r1050098428
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -585,6 +585,11 @@ private boolean processStreamEvents(MessageBatch
messagesAndOffsets, long idlePi
realtimeRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName,
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
+ long currentTime = System.currentTimeMillis();
+ long pinotIngestionDelayMs = currentTime -
msgMetadata.getRecordIngestionTimeMs();
+ pinotIngestionDelayMs = pinotIngestionDelayMs >= 0 ?
pinotIngestionDelayMs : 0;
Review Comment:
when would `System.currentTimeMillis() -
msgMetadata.getRecordIngestionTimeMs()` ever be < 0?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -611,6 +616,8 @@ private boolean processStreamEvents(MessageBatch
messagesAndOffsets, long idlePi
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms",
idlePipeSleepTimeMillis);
}
+ // Record Pinot ingestion delay as zero since we are up-to-date and no
new events
Review Comment:
How would we know if we are up-to-date without querying the source ? A
consumer returning an empty batch doesn't necessarily mean we are caught up to
the source. Can you explain why we make this assumption here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]