This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 008ad90a530 Skip Freshness check for streams not supporting offset lag
(#17560)
008ad90a530 is described below
commit 008ad90a53062577e6ede2d88da3d9fa49b65118
Author: NOOB <[email protected]>
AuthorDate: Sun Jan 25 09:45:43 2026 +0530
Skip Freshness check for streams not supporting offset lag (#17560)
---
.../IngestionBasedConsumptionStatusChecker.java | 20 +++++++++-
...FreshnessBasedConsumptionStatusCheckerTest.java | 41 +++++++++++++++++++++
.../OffsetBasedConsumptionStatusCheckerTest.java | 43 ++++++++++++++++++++++
3 files changed, 103 insertions(+), 1 deletion(-)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index ef389f645ef..e92471c42df 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -31,6 +31,7 @@ import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,8 @@ public abstract class IngestionBasedConsumptionStatusChecker {
}
Set<String> consumingSegments = tableSegments.getValue();
Set<String> caughtUpSegments =
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new
HashSet<>());
+ boolean skippedSegmentsLogged = false;
+
for (String segName : consumingSegments) {
if (caughtUpSegments.contains(segName)) {
continue;
@@ -95,7 +98,22 @@ public abstract class IngestionBasedConsumptionStatusChecker
{
continue;
}
RealtimeSegmentDataManager rtSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- if (isSegmentCaughtUp(segName, rtSegmentDataManager,
(RealtimeTableDataManager) tableDataManager)) {
+ RealtimeTableDataManager realtimeTableDataManager =
(RealtimeTableDataManager) tableDataManager;
+
+ StreamMetadataProvider streamMetadataProvider =
+
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+
+ if (!streamMetadataProvider.supportsOffsetLag()) {
+ // Cannot conclude if segment has caught up or not. Skip such
segments.
+ if (!skippedSegmentsLogged) {
+ _logger.warn(
+ "Stream provider for table: {} does not support offset
subtraction. Cannot conclude if the segment "
+ + "has caught up. Skipping the segments.",
+ realtimeTableDataManager.getTableName());
+ skippedSegmentsLogged = true;
+ }
+ caughtUpSegments.add(segName);
+ } else if (isSegmentCaughtUp(segName, rtSegmentDataManager,
realtimeTableDataManager)) {
caughtUpSegments.add(segName);
}
} finally {
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 0953045bc80..a235b1d4382 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -119,7 +119,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segMngrB0.getSegmentName()).thenReturn(segB0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
@@ -174,7 +180,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -247,7 +255,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segMngrB0.getSegmentName()).thenReturn(segB0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
@@ -325,7 +339,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segMngrB0.getSegmentName()).thenReturn(segB0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
@@ -414,7 +434,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segMngrB0.getSegmentName()).thenReturn(segB0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
@@ -479,7 +505,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segMngrB0.getSegmentName()).thenReturn(segB0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
@@ -555,7 +587,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segMngrB0.getSegmentName()).thenReturn(segB0);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of());
@@ -580,6 +618,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
// segB0 0 0 100
0
setupLatestIngestionTimestamp(segMngrA0, 89L);
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
@@ -620,6 +659,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segMngrA1.getSegmentName()).thenReturn(segA1);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
// segA0 provider throws RuntimeException - this should be caught and
handled gracefully
// In practice, RealtimeSegmentMetadataUtils wraps TimeoutException in
RuntimeException
when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenThrow(
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index e63b10fafa0..ba1c85f9afb 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -28,6 +28,7 @@ import
org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -64,6 +65,17 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new
LongMsgOffset(15));
when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new
LongMsgOffset(150));
when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new
LongMsgOffset(1500));
@@ -117,6 +129,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+
// latest ingested offset latest stream offset
// segA0 10 15
// segA1 100 150
@@ -131,6 +150,10 @@ public class OffsetBasedConsumptionStatusCheckerTest {
RealtimeSegmentDataManager segMngrB0 =
mock(RealtimeSegmentDataManager.class);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
// latest ingested offset latest stream offset
// segA0 20 15
// segA1 200 150
@@ -179,6 +202,16 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
// latest ingested offset latest stream offset
// segA0 10 15
// segA1 100 150
@@ -234,6 +267,16 @@ public class OffsetBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+ when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+ when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+ when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
// latest ingested offset latest stream offset
// segA0 10 15
// segA1 100 150
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]