noob-se7en commented on code in PR #17598:
URL: https://github.com/apache/pinot/pull/17598#discussion_r2755815696
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java:
##########
@@ -48,32 +50,29 @@ public
FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataMa
super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
_minFreshnessMs = minFreshnessMs;
_idleTimeoutMs = idleTimeoutMs;
+ _logger.info("FreshnessBasedConsumptionStatusChecker initialized with
min_freshness={}ms, idle_timeout={}ms",
+ _minFreshnessMs, _idleTimeoutMs);
}
private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
}
- protected long now() {
- return System.currentTimeMillis();
- }
-
@Override
protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager,
RealtimeTableDataManager realtimeTableDataManager) {
- long now = now();
- long latestIngestionTimestamp =
-
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
- long freshnessMs = now - latestIngestionTimestamp;
+ SegmentMetadata segmentMetadata =
rtSegmentDataManager.getSegment().getSegmentMetadata();
+ long minimumIngestionLagMs = segmentMetadata.getMinimumIngestionLagMs();
- // We check latestIngestionTimestamp >= 0 because the default freshness
when unknown is Long.MIN_VALUE
- if (latestIngestionTimestamp >= 0 && freshnessMs <= _minFreshnessMs) {
- _logger.info("Segment {} with freshness {}ms has caught up within min
freshness {}", segmentName, freshnessMs,
- _minFreshnessMs);
+ // Simple freshness check - if minimum lag ever seen is within threshold,
we're caught up.
+ // Note: default value is Long.MAX_VALUE when unknown, which will
correctly fail this check.
Review Comment:
Regarding the default value, Is it guaranteed atleast in kafka that record
timestamp is always present?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]