This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4c7fd6f333f Continue Freshness check if latest stream offset fetch
fails (#17563)
4c7fd6f333f is described below
commit 4c7fd6f333f0159680bf9d97abb75a47ebd2e9ef
Author: NOOB <[email protected]>
AuthorDate: Fri Jan 23 20:12:28 2026 +0530
Continue Freshness check if latest stream offset fetch fails (#17563)
* Continue Freshness check if latest stream offset fetch fails
* Adds nullable annotation
* fixes test
* fixes lint
---
.../FreshnessBasedConsumptionStatusChecker.java | 15 +++--
.../IngestionBasedConsumptionStatusChecker.java | 5 +-
...FreshnessBasedConsumptionStatusCheckerTest.java | 68 ++++++++++++++++++++++
3 files changed, 82 insertions(+), 6 deletions(-)
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 13dd8075841..5a02a91f2dd 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -78,11 +78,18 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset =
rtSegmentDataManager.getCurrentOffset();
- StreamMetadataProvider streamMetadataProvider =
-
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
- StreamPartitionMsgOffset latestStreamOffset =
-
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(rtSegmentDataManager,
streamMetadataProvider);
+ StreamPartitionMsgOffset latestStreamOffset = null;
+ try {
+ StreamMetadataProvider streamMetadataProvider =
+
realtimeTableDataManager.getStreamMetadataProvider(rtSegmentDataManager);
+ latestStreamOffset =
+
RealtimeSegmentMetadataUtils.fetchLatestStreamOffset(rtSegmentDataManager,
streamMetadataProvider);
+ } catch (Exception e) {
+ _logger.warn("Failed to fetch latest stream offset for segment: {}. Will
continue with other checks.",
+ segmentName, e);
+ }
+ // Check if we're caught up (isOffsetCaughtUp handles null
latestStreamOffset by returning false)
if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
_logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
+ "But the current ingested offset is equal to the latest
available offset {}.", segmentName, freshnessMs,
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
index 56ed05920e3..ef389f645ef 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import javax.annotation.Nullable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
@@ -139,8 +140,8 @@ public abstract class
IngestionBasedConsumptionStatusChecker {
protected abstract boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager,
RealtimeTableDataManager realtimeTableDataManager);
- protected boolean isOffsetCaughtUp(String segmentName,
- StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset
latestOffset) {
+ protected boolean isOffsetCaughtUp(String segmentName, @Nullable
StreamPartitionMsgOffset currentOffset,
+ @Nullable StreamPartitionMsgOffset latestOffset) {
if (currentOffset != null && latestOffset != null) {
// Kafka's "latest" offset is actually the next available offset.
Therefore it will be 1 ahead of the
// current offset in the case we are caught up.
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 40b3a40e2e9..0953045bc80 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -582,4 +583,71 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
+
+ @Test
+ public void testTimeoutExceptionWhenFetchingLatestStreamOffset() {
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ Map<String, Set<String>> consumingSegments = new HashMap<>();
+ consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ long idleTimeoutMs = 10L;
+ FreshnessBasedConsumptionStatusChecker statusChecker =
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
idleTimeoutMs, 100L);
+
+ // TableDataManager is not set up yet
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
2);
+
+ // setup TableDataManager
+ RealtimeTableDataManager tableDataManagerA =
mock(RealtimeTableDataManager.class);
+
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+
+ // setup SegmentDataManagers
+ RealtimeSegmentDataManager segMngrA0 =
mock(RealtimeSegmentDataManager.class);
+ RealtimeSegmentDataManager segMngrA1 =
mock(RealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+
+ StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
+ StreamMetadataProvider segA1Provider = mock(StreamMetadataProvider.class);
+
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
+
when(tableDataManagerA.getStreamMetadataProvider(segMngrA1)).thenReturn(segA1Provider);
+
+ when(segMngrA0.getStreamPartitionId()).thenReturn(0);
+ when(segMngrA1.getStreamPartitionId()).thenReturn(1);
+ when(segMngrA0.getSegmentName()).thenReturn(segA0);
+ when(segMngrA1.getSegmentName()).thenReturn(segA1);
+
+ // segA0 provider throws RuntimeException - this should be caught and
handled gracefully
+ // In practice, RealtimeSegmentMetadataUtils wraps TimeoutException in
RuntimeException
+ when(segA0Provider.fetchLatestStreamOffset(anySet(), anyLong())).thenThrow(
+ new RuntimeException("Failed to fetch latest stream offset for
segment: " + segA0,
+ new TimeoutException("Timeout fetching latest stream offset")));
+ // segA1 provider works normally
+ when(segA1Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(1, new LongMsgOffset(20)));
+
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ // ensure negative values are ignored
+ setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+ setupLatestIngestionTimestamp(segMngrA1, Long.MIN_VALUE);
+
+ // segA0 has idle time below threshold, segA1 has idle time above threshold
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
- 1);
+ when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
+ 1);
+
+ // segA0: timeout exception when fetching latest offset, but idle time is
below threshold
+ // - should not be caught up (can't determine from offset, and
idle time not exceeded)
+ // segA1: can fetch latest offset (10 < 20), but idle time exceeds
threshold
+ // - should be caught up due to idle timeout
+ // Expected: 1 segment not caught up (segA0)
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
+
+ // Now make segA0 also exceed idle timeout - it should be caught up
despite timeout exception
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
+ 1);
+ // Expected: 0 segments not caught up (both exceed idle timeout)
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]