KKcorps commented on code in PR #14142:
URL: https://github.com/apache/pinot/pull/14142#discussion_r1804791004
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -400,20 +524,35 @@ public long getPartitionEndToEndIngestionDelayMs(int
partitionId) {
}
public long getPartitionIngestionOffsetLag(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- if (ingestionInfo == null) {
- return 0;
- }
- StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
- if (currentOffset == null || latestOffset == null) {
- return 0;
- }
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
+ try {
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ if (ingestionInfo == null) {
+ return 0;
+ }
+ StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
+ StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
Review Comment:
That's the issue, that we can get the currentOffset only via the
updateIngestionMetric method call
and the latest offset we are calculating in seperate periodic thread
if I just want to store the lag, i'll either have to calculate latestOffset
every time `updateIngestionMetrics` is called with currentOffset for the
partition. I can't do this since it'll defeat the purpose of this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]