This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 008ad90a530 Skip Freshness check for streams not supporting offset lag 
(#17560)
008ad90a530 is described below

commit 008ad90a53062577e6ede2d88da3d9fa49b65118
Author: NOOB <[email protected]>
AuthorDate: Sun Jan 25 09:45:43 2026 +0530

    Skip Freshness check for streams not supporting offset lag (#17560)
---
 .../IngestionBasedConsumptionStatusChecker.java    | 20 +++++++++-
 ...FreshnessBasedConsumptionStatusCheckerTest.java | 41 +++++++++++++++++++++
 .../OffsetBasedConsumptionStatusCheckerTest.java   | 43 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 1 deletion(-)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index ef389f645ef..e92471c42df 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,8 @@ public abstract class IngestionBasedConsumptionStatusChecker {
       }
       Set<String> consumingSegments = tableSegments.getValue();
       Set<String> caughtUpSegments = 
_caughtUpSegmentsByTable.computeIfAbsent(tableNameWithType, k -> new 
HashSet<>());
+      boolean skippedSegmentsLogged = false;
+
       for (String segName : consumingSegments) {
         if (caughtUpSegments.contains(segName)) {
           continue;
@@ -95,7 +98,22 @@ public abstract class IngestionBasedConsumptionStatusChecker 
{
             continue;
           }
           RealtimeSegmentDataManager rtSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-          if (isSegmentCaughtUp(segName, rtSegmentDataManager, 
(RealtimeTableDataManager) tableDataManager)) {
+          RealtimeTableDataManager realtimeTableDataManager = 
(RealtimeTableDataManager) tableDataManager;
+
+          StreamMetadataProvider streamMetadataProvider =
+              
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+
+          if (!streamMetadataProvider.supportsOffsetLag()) {
+            // Cannot conclude if segment has caught up or not. Skip such 
segments.
+            if (!skippedSegmentsLogged) {
+              _logger.warn(
+                  "Stream provider for table: {} does not support offset 
subtraction. Cannot conclude if the segment "
+                      + "has caught up. Skipping the segments.",
+                  realtimeTableDataManager.getTableName());
+              skippedSegmentsLogged = true;
+            }
+            caughtUpSegments.add(segName);
+          } else if (isSegmentCaughtUp(segName, rtSegmentDataManager, 
realtimeTableDataManager)) {
             caughtUpSegments.add(segName);
           }
         } finally {
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 0953045bc80..a235b1d4382 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -119,7 +119,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
     when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
@@ -174,7 +180,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
     
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
 
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
@@ -247,7 +255,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
     when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
@@ -325,7 +339,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
     when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
@@ -414,7 +434,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
     when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
@@ -479,7 +505,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
     when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(2000)));
@@ -555,7 +587,13 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
+    when(segMngrA0.getSegmentName()).thenReturn(segA0);
+    when(segMngrA1.getSegmentName()).thenReturn(segA1);
+    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
     when(segA1Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(200)));
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of());
@@ -580,6 +618,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     // segB0              0                       0                      100   
            0
     setupLatestIngestionTimestamp(segMngrA0, 89L);
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
@@ -620,6 +659,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getSegmentName()).thenReturn(segA0);
     when(segMngrA1.getSegmentName()).thenReturn(segA1);
 
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
     // segA0 provider throws RuntimeException - this should be caught and 
handled gracefully
     // In practice, RealtimeSegmentMetadataUtils wraps TimeoutException in 
RuntimeException
     when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenThrow(
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
index e63b10fafa0..ba1c85f9afb 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/OffsetBasedConsumptionStatusCheckerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -64,6 +65,17 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
     when(segMngrA0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(15));
     when(segMngrA1.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(150));
     when(segMngrB0.getLatestStreamOffsetAtStartupTime()).thenReturn(new 
LongMsgOffset(1500));
@@ -117,6 +129,13 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
 
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+
     //           latest ingested offset    latest stream offset
     // segA0               10                     15
     // segA1               100                    150
@@ -131,6 +150,10 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     RealtimeSegmentDataManager segMngrB0 = 
mock(RealtimeSegmentDataManager.class);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
     //           latest ingested offset     latest stream offset
     // segA0               20                      15
     // segA1               200                     150
@@ -179,6 +202,16 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
     //           latest ingested offset    latest stream offset
     // segA0              10                       15
     // segA1              100                      150
@@ -234,6 +267,16 @@ public class OffsetBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
+    StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+    StreamMetadataProvider segB0Provider = mock(StreamMetadataProvider.class);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+    
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+    
when(tableDataManagerB.getStreamMetadataProvider(segMngrB0)).thenReturn(segB0Provider);
+    when(segA0Provider.supportsOffsetLag()).thenReturn(true);
+    when(segA1Provider.supportsOffsetLag()).thenReturn(true);
+    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
+
     //           latest ingested offset    latest stream offset
     // segA0              10                       15
     // segA1              100                      150


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to