Jackie-Jiang commented on code in PR #9994:
URL: https://github.com/apache/pinot/pull/9994#discussion_r1062020489
##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -45,8 +45,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
// Dedup metrics
DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
- JVM_HEAP_USED_BYTES("bytes", true);
-
+ JVM_HEAP_USED_BYTES("bytes", true),
+ // Lag metrics
+ TABLE_MAX_INGESTION_DELAY_MS("milliseconds", false),
+ TABLE_PER_PARTITION_INGESTION_DELAY_MS("milliseconds", false);
Review Comment:
I think we can use the same gauge for both table level and partition level.
For partition level, we will suffix it with `.partitionId`.
```suggestion
MAX_INGESTION_DELAY_MS("milliseconds", false);
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1554,6 +1557,25 @@ private void createPartitionMetadataProvider(String
reason) {
_partitionMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId);
}
+ /*
+ * Updates the ingestion delay if messages were processed using the time
stamp for the last consumed event.
+ *
+ * @param indexedMessagesCount
+ */
+ private void updateIngestionDelay(int indexedMessageCount) {
+ if (_catchingUpPhase) {
Review Comment:
If this is `false` in the beginning, we also need to update the ingestion
delay when it becomes `true`, or the lag will already be propagated.
##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -77,6 +77,9 @@ public int
getNumConsumingSegmentsNotReachedIngestionCriteria() {
LLRealtimeSegmentDataManager rtSegmentDataManager =
(LLRealtimeSegmentDataManager) segmentDataManager;
if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
_caughtUpSegments.add(segName);
+ rtSegmentDataManager.notifyConsumptionCaughtUp(false);
Review Comment:
We want to skip reporting delay during server startup because:
1. Server will need to catch up, which will very likely trigger the alert
2. Server is not really serving the query, so it is actually false alarm
We do want to report delay when a new consuming segment is created because
server is serving queries from it.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -69,4 +69,13 @@ protected static PinotDataBufferMemoryManager
getMemoryManager(String consumerDi
public abstract Map<String, PartitionLagState> getPartitionToLagState(
Map<String, ConsumerPartitionState> consumerPartitionStateMap);
+
+ /**
+ * The RT segment data manager can handle status change from external
components like the ConsumptionStatusChecker
+ * etc. Currently, it acts as a way to signal the RT Segment data manager
that the current partition has caught up.
+ *
+ * @param caughtUpWithUpstream Boolean indicating if the partition has
caught up with upstream source or not based on
+ * the strategy used in the {@literal
IngestionBasedConsumptionStatusChecker}
+ */
+ public abstract void notifyConsumptionCaughtUp(boolean caughtUpWithUpstream);
Review Comment:
This is quite confusing. Per the implementation, seems like passing `false`
means the segment is caught up?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1594,4 +1616,9 @@ public String getSegmentName() {
public void forceCommit() {
_forceCommitMessageReceived = true;
}
+
+ @Override
+ public void notifyConsumptionCaughtUp(boolean catchingUpPhase) {
Review Comment:
Another way to handle it is to assume it is caught up in the beginning, and
let the status checker to set it as not caught up yet. The status checker only
monitor the initial consuming segments during restart. New consuming segments
won't be tracked, thus will be in caught up phase automatically.
I think this PR is already taking this approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]