noob-se7en commented on code in PR #15831:
URL: https://github.com/apache/pinot/pull/15831#discussion_r2100103527
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -112,6 +135,14 @@ private static class IngestionInfo {
// Cache expire time for ignored segment if there is no update from the
segment.
private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
+ public static final String OFFSET_LAG_TRACKING_ENABLE_CONFIG_KEY =
"offset.lag.tracking.enable";
+ public static final String OFFSET_LAG_TRACKING_UPDATE_INTERVAL_CONFIG_KEY =
"offset.lag.tracking.update.interval";
+
+ // Since offset lag metric fetches metadata from upstream, we want to make
sure we don't do it too frequently.
+ public static final boolean DEFAULT_ENABLE_OFFSET_LAG_METRIC = false;
+ public static final long DEFAULT_OFFSET_LAG_UPDATE_INTERVAL_MS = 60000; // 1
minute
+ public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL = 1000L;
Review Comment:
nit:
```suggestion
public static final long MIN_OFFSET_LAG_UPDATE_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(1);
```
We can do same for other time unit values as well.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -301,7 +301,7 @@ public void updateIngestionMetrics(String segmentName, int
partitionId, long ing
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset,
@Nullable StreamPartitionMsgOffset latestOffset) {
_ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, firstStreamIngestionTimeMs,
- currentOffset, latestOffset);
+ currentOffset);
Review Comment:
I feel we should not remove this as of now and can have two delay metric at
the same time to gain confidence.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -416,16 +502,74 @@ public long getPartitionIngestionOffsetLag(int
partitionId) {
if (ingestionInfo == null) {
return 0;
}
- StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+ return ingestionInfo._offsetLag;
+ }
+
+ private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs,
+ StreamMetadataProvider streamMetadataProvider) {
+ try {
+ return streamMetadataProvider.fetchStreamPartitionOffset(offsetCriteria,
maxWaitTimeMs);
+ } catch (Exception e) {
+ LOGGER.debug("Caught exception while fetching stream offset", e);
+ }
+ return null;
+ }
+
+ /**
+ * Creates a new stream metadata provider
+ */
+ private StreamMetadataProvider createPartitionMetadataProvider(String
reason, String clientId, int partitionGroupId) {
+ LOGGER.info("Creating new partition metadata provider, reason: {}",
reason);
+ return _streamConsumerFactory.createPartitionMetadataProvider(clientId,
partitionGroupId);
+ }
+
+ private void updateOffsetLagForAllPartitions() {
+ List<Map.Entry<Integer, IngestionInfo>> entries = new
ArrayList<>(_ingestionInfoMap.entrySet());
Review Comment:
I believe this tracker will only work if consumer thread adds entries inside
_ingestionInfoMap. I don't think it will work when consumer dies before doing
that. This has happened in the past that offline -> consuming transition failed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]