This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 08edbc98106 freshness checkers uses minimum ingestion lag (#17598)
08edbc98106 is described below

commit 08edbc98106af3b67bc46461684ee42bdb51818c
Author: Johan Adami <[email protected]>
AuthorDate: Mon Feb 9 20:09:27 2026 -0500

    freshness checkers uses minimum ingestion lag (#17598)
---
 .../indexsegment/mutable/MutableSegmentImpl.java   |  10 +
 .../apache/pinot/segment/spi/SegmentMetadata.java  |   9 +
 .../spi/index/metadata/SegmentMetadataImpl.java    |   5 +
 .../FreshnessBasedConsumptionStatusChecker.java    |  50 ++---
 ...FreshnessBasedConsumptionStatusCheckerTest.java | 245 +++++++++------------
 5 files changed, 149 insertions(+), 170 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 2e8d9bb4bf5..43150d61c59 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -191,6 +191,7 @@ public class MutableSegmentImpl implements MutableSegment {
   // default message metadata
   private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
   private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
+  private volatile long _minimumIngestionLagMs = Long.MAX_VALUE;
 
   // multi-column text index fields
   private final MultiColumnRealtimeLuceneTextIndex _multiColumnTextIndex;
@@ -224,6 +225,11 @@ public class MutableSegmentImpl implements MutableSegment {
         return _latestIngestionTimeMs;
       }
 
+      @Override
+      public long getMinimumIngestionLagMs() {
+        return _minimumIngestionLagMs;
+      }
+
       @Override
       public boolean isMutableSegment() {
         return true;
@@ -701,6 +707,10 @@ public class MutableSegmentImpl implements MutableSegment {
     _lastIndexedTimeMs = System.currentTimeMillis();
     if (metadata != null) {
       _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, 
metadata.getRecordIngestionTimeMs());
+      // if for some reason the record ingestion time is in the future, we 
should not
+      // update the minimum ingestion lag past 0
+      long ingestionLagMs = Math.max(0, _lastIndexedTimeMs - 
_latestIngestionTimeMs);
+      _minimumIngestionLagMs = Math.min(_minimumIngestionLagMs, 
ingestionLagMs);
     }
 
     return canTakeMore;
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
index e76a540c975..eff83442d2e 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
@@ -95,6 +95,15 @@ public interface SegmentMetadata {
    */
   long getLatestIngestionTimestamp();
 
+  /**
+   * Return the minimum ingestion lag recorded for this segment. Ingestion lag 
is
+   * the difference between the record ingestion timestamp and current system 
time.
+   * Applicable for MutableSegments.
+   *
+   * @return minimum ingestion lag recorded for this segment
+   */
+  long getMinimumIngestionLagMs();
+
   @Nullable
   List<StarTreeV2Metadata> getStarTreeV2MetadataList();
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 5a79535fead..1f3a0c9a52e 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -438,6 +438,11 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
     return Long.MIN_VALUE;
   }
 
+  @Override
+  public long getMinimumIngestionLagMs() {
+    return Long.MAX_VALUE;
+  }
+
   @Nullable
   @Override
   public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 5a02a91f2dd..d7d97da7212 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
@@ -35,8 +36,9 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
  * execution happens and consumers try to catch up to the latest messages 
available in streams.
  * To achieve this, every time status check is called - {@link 
#getNumConsumingSegmentsNotReachedIngestionCriteria} -
  * for each consuming segment, we check if either:
- *   - the segment's latest ingested offset has reached the current stream 
offset that's
- *   - the last ingested message is within {@link #_minFreshnessMs} of the 
current system time
+ *   - the segment's minimum ingestion lag is within {@link #_minFreshnessMs}
+ *   - the segment's latest ingested offset has reached the current stream 
offset
+ *   - the segment has been idle longer than {@link #_idleTimeoutMs}
  */
 public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsumptionStatusChecker {
   private final long _minFreshnessMs;
@@ -48,32 +50,29 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
     super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
     _minFreshnessMs = minFreshnessMs;
     _idleTimeoutMs = idleTimeoutMs;
+    _logger.info("FreshnessBasedConsumptionStatusChecker initialized with 
min_freshness={}ms, idle_timeout={}ms",
+        _minFreshnessMs, _idleTimeoutMs);
   }
 
   private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
     return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
   }
 
-  protected long now() {
-    return System.currentTimeMillis();
-  }
-
   @Override
   protected boolean isSegmentCaughtUp(String segmentName, 
RealtimeSegmentDataManager rtSegmentDataManager,
       RealtimeTableDataManager realtimeTableDataManager) {
-    long now = now();
-    long latestIngestionTimestamp =
-        
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
-    long freshnessMs = now - latestIngestionTimestamp;
+    SegmentMetadata segmentMetadata = 
rtSegmentDataManager.getSegment().getSegmentMetadata();
+    long minimumIngestionLagMs = segmentMetadata.getMinimumIngestionLagMs();
 
-    // We check latestIngestionTimestamp >= 0 because the default freshness 
when unknown is Long.MIN_VALUE
-    if (latestIngestionTimestamp >= 0 && freshnessMs <= _minFreshnessMs) {
-      _logger.info("Segment {} with freshness {}ms has caught up within min 
freshness {}", segmentName, freshnessMs,
-          _minFreshnessMs);
+    // Simple freshness check - if minimum lag ever seen is within threshold, 
we're caught up.
+    // Note: default value is Long.MAX_VALUE when unknown, which will 
correctly fail this check.
+    if (minimumIngestionLagMs <= _minFreshnessMs) {
+      _logger.info("Segment {} with minimum ingestion lag {}ms has caught up 
within min freshness {}ms",
+          segmentName, minimumIngestionLagMs, _minFreshnessMs);
       return true;
     }
 
-    // For stream partitions that see very low volume, it's possible we're 
already caught up but the oldest
+    // Fallback: Check offset catchup.
     // message is too old to pass the freshness check. We check this condition 
separately to avoid hitting
     // the stream consumer to check partition count if we're already caught up.
     StreamPartitionMsgOffset currentOffset = 
rtSegmentDataManager.getCurrentOffset();
@@ -89,26 +88,25 @@ public class FreshnessBasedConsumptionStatusChecker extends 
IngestionBasedConsum
           segmentName, e);
     }
 
-    // Check if we're caught up (isOffsetCaughtUp handles null 
latestStreamOffset by returning false)
     if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
-      _logger.info("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
-              + "But the current ingested offset is equal to the latest 
available offset {}.", segmentName, freshnessMs,
-          _minFreshnessMs, currentOffset);
+      _logger.info("Segment {} with minimum ingestion lag {}ms has not caught 
up within min freshness {}ms. "
+              + "But the current ingested offset is equal to the latest 
available offset {}.",
+          segmentName, minimumIngestionLagMs, _minFreshnessMs, currentOffset);
       return true;
     }
 
+    // Fallback: Check idle timeout.
     long idleTimeMs = rtSegmentDataManager.getTimeSinceEventLastConsumedMs();
     if (segmentHasBeenIdleLongerThanThreshold(idleTimeMs)) {
-      _logger.warn("Segment {} with freshness {}ms has not caught up within 
min freshness {}. "
-              + "But the current ingested offset {} has been idle for {}ms. At 
offset {}. " + "Latest offset {}.",
-          segmentName, freshnessMs, _minFreshnessMs, currentOffset, 
idleTimeMs, currentOffset, latestStreamOffset);
+      _logger.warn("Segment {} with minimum ingestion lag {}ms has not caught 
up within min freshness {}ms. "
+              + "But has been idle for {}ms. At offset {}. Latest offset {}.",
+          segmentName, minimumIngestionLagMs, _minFreshnessMs, idleTimeMs, 
currentOffset, latestStreamOffset);
       return true;
     }
 
-    _logger.info("Segment {} with freshness {}ms has not caught up within "
-            + "min freshness {}. At offset {}. Latest offset {}.", 
segmentName, freshnessMs, _minFreshnessMs,
-        currentOffset,
-        latestStreamOffset);
+    _logger.info("Segment {} with minimum ingestion lag {}ms has not caught up 
within min freshness {}ms. "
+            + "At offset {}. Latest offset {}. Idle time ms {}",
+        segmentName, minimumIngestionLagMs, _minFreshnessMs, currentOffset, 
latestStreamOffset, idleTimeMs);
     return false;
   }
 }
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index a235b1d4382..17c98b0f812 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
@@ -45,21 +44,13 @@ import static org.testng.Assert.assertEquals;
 
 public class FreshnessBasedConsumptionStatusCheckerTest {
 
-  private class FakeFreshnessBasedConsumptionStatusChecker extends 
FreshnessBasedConsumptionStatusChecker {
-
-    private final long _now;
-
-    public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager 
instanceDataManager,
-        Map<String, Set<String>> consumingSegments, Function<String, 
Set<String>> consumingSegmentsSupplier,
-        long minFreshnessMs, long idleTimeoutMs, long now) {
-      super(instanceDataManager, consumingSegments, consumingSegmentsSupplier, 
minFreshnessMs, idleTimeoutMs);
-      _now = now;
-    }
-
-    @Override
-    protected long now() {
-      return _now;
-    }
+  private void setupMinimumIngestionLag(RealtimeSegmentDataManager 
segmentDataManager,
+      long minimumIngestionLagMs) {
+    MutableSegment mockSegment = mock(MutableSegment.class);
+    SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
+    when(mockSegment.getSegmentMetadata()).thenReturn(mockSegmentMetdata);
+    
when(mockSegmentMetdata.getMinimumIngestionLagMs()).thenReturn(minimumIngestionLagMs);
+    when(segmentDataManager.getSegment()).thenReturn(mockSegment);
   }
 
   @Test
@@ -92,20 +83,17 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
 
-    MutableSegment mockSegment = mock(MutableSegment.class);
-    SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
-    when(mockSegment.getSegmentMetadata()).thenReturn(mockSegmentMetdata);
-    when(mockSegmentMetdata.getLatestIngestionTimestamp()).thenReturn(0L);
+    // mock minimum ingestion lag higher than the threshold so offset catchup 
is used
+    setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
+    setupMinimumIngestionLag(segMngrA1, Long.MAX_VALUE);
+    setupMinimumIngestionLag(segMngrB0, Long.MAX_VALUE);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              15                       20                     now  
             0
-    // segA1              150                      200                    now  
             0
-    // segB0              1500                     2000                   now  
             0
-    when(segMngrA0.getSegment()).thenReturn(mockSegment);
+    //              current offset          latest stream offset    minimum 
ingestion lag
+    // segA0              15                       20                     
MAX_VALUE
+    // segA1              150                      200                    
MAX_VALUE
+    // segB0              1500                     2000                   
MAX_VALUE
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(15));
-    when(segMngrA1.getSegment()).thenReturn(mockSegment);
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(150));
-    when(segMngrB0.getSegment()).thenReturn(mockSegment);
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500));
 
     StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
@@ -119,9 +107,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
-    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -132,20 +117,20 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
 
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              20                       20                     now  
             0
-    // segA1              300                      200                    now  
             0
-    // segB0              1998                     2000                   now  
             0
+    //              current offset          latest stream offset
+    // segA0              20                       20
+    // segA1              300                      200
+    // segB0              1998                     2000
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
     // The unexpected case where currentOffset > latestOffset
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1998));
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              20                       20                     100  
             0
-    // segA1              200                      200                    100  
             0
-    // segB0              2000                     2000                   100  
             0
+    //              current offset          latest stream offset
+    // segA0              20                       20
+    // segA1              200                      200
+    // segB0              2000                     2000
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
@@ -180,17 +165,16 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
     
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
 
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+    // ensure high lag values are handled - offset catchup will be used
+    setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              0                       20                     100   
            Long.MIN_VALUE
+    //              current offset          latest stream offset    minimum 
ingestion lag
+    // segA0              0                       20                     
MAX_VALUE
     // segA1 (segment is absent)
     // segB0 (table is absent)
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -205,15 +189,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 
-  private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager 
segmentDataManager,
-      long latestIngestionTimestamp) {
-    MutableSegment mockSegment = mock(MutableSegment.class);
-    SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
-    when(mockSegment.getSegmentMetadata()).thenReturn(mockSegmentMetdata);
-    
when(mockSegmentMetdata.getLatestIngestionTimestamp()).thenReturn(latestIngestionTimestamp);
-    when(segmentDataManager.getSegment()).thenReturn(mockSegment);
-  }
-
   @Test
   public void regularCaseWithFreshnessCatchup() {
     String segA0 = "tableA__0__0__123Z";
@@ -223,9 +198,10 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
     consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    long minFreshnessMs = 10L;
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
-            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 
minFreshnessMs, 0L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -255,9 +231,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
-    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -268,32 +241,32 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
-    setupLatestIngestionTimestamp(segMngrA1, -1L);
-    setupLatestIngestionTimestamp(segMngrB0, 0L);
-
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              0                       20                     100   
            Long.MIN_VALUE
-    // segA1              0                       200                    100   
            -1
-    // segB0              0                       2000                   100   
            0
+
+    // High minimum ingestion lag - segments not caught up
+    setupMinimumIngestionLag(segMngrA0, 200L);
+    setupMinimumIngestionLag(segMngrA1, 200L);
+    setupMinimumIngestionLag(segMngrB0, 200L);
+
+    //              minimum ingestion lag     minFreshnessMs
+    // segA0              200                      10
+    // segA1              200                      10
+    // segB0              200                      10
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              0                       20                     100   
            90
-    // segA1              0                       200                    100   
            101
-    // segB0              0                       2000                   100   
            50
-    setupLatestIngestionTimestamp(segMngrA0, 90L);
-    // Unexpected case where latest ingested is somehow after current time
-    setupLatestIngestionTimestamp(segMngrA1, 101L);
-    setupLatestIngestionTimestamp(segMngrB0, 89L);
+    //              minimum ingestion lag     minFreshnessMs
+    // segA0              5                       10  (caught up)
+    // segA1              10                      10  (caught up - exactly at 
threshold)
+    // segB0              15                      10  (not caught up)
+    setupMinimumIngestionLag(segMngrA0, 5L);
+    setupMinimumIngestionLag(segMngrA1, 10L);
+    setupMinimumIngestionLag(segMngrB0, 15L);
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              20                       20                     100  
             90
-    // segA1              200                      200                    100  
             101
-    // segB0              1999                     2000                   100  
             95
-    setupLatestIngestionTimestamp(segMngrB0, 95L);
+    //              minimum ingestion lag     minFreshnessMs
+    // segA0              5                       10  (caught up)
+    // segA1              10                      10  (caught up)
+    // segB0              8                       10  (caught up)
+    setupMinimumIngestionLag(segMngrB0, 8L);
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
 
@@ -308,8 +281,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     long idleTimeoutMs = 10L;
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
-            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
idleTimeoutMs, 100L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
idleTimeoutMs);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -339,9 +312,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
-    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -353,10 +323,11 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
-    setupLatestIngestionTimestamp(segMngrA1, -1L);
-    setupLatestIngestionTimestamp(segMngrB0, 0L);
+
+    // mock minimum ingestion lag higher than the threshold so idle timeout is 
used
+    setupMinimumIngestionLag(segMngrA0, 200L);
+    setupMinimumIngestionLag(segMngrA1, 200L);
+    setupMinimumIngestionLag(segMngrB0, 200L);
 
     when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
     when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
@@ -403,8 +374,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
-            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -434,9 +405,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
-    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -448,10 +416,10 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
-    setupLatestIngestionTimestamp(segMngrA1, -1L);
-    setupLatestIngestionTimestamp(segMngrB0, 0L);
+    // minimum ingestion lag is higher than threshold
+    setupMinimumIngestionLag(segMngrA0, 200L);
+    setupMinimumIngestionLag(segMngrA1, 200L);
+    setupMinimumIngestionLag(segMngrB0, 200L);
 
     when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
     when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
@@ -474,8 +442,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
-            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -505,9 +473,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
-    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -519,28 +484,27 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
-    setupLatestIngestionTimestamp(segMngrA1, -1L);
-    setupLatestIngestionTimestamp(segMngrB0, 0L);
-
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              0                       20                     100   
            Long.MIN_VALUE
-    // segA1              0                       200                    100   
            -1
-    // segB0              0                       2000                   100   
            0
+    // minimum ingestion lag is higher than threshold
+    setupMinimumIngestionLag(segMngrA0, 200L);
+    setupMinimumIngestionLag(segMngrA1, 200L);
+    setupMinimumIngestionLag(segMngrB0, 200L);
+
+    //              minimum ingestion lag     minFreshnessMs
+    // segA0              200                      10
+    // segA1              200                      10
+    // segB0              200                      10
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
 
     // segB0 is now committed; ImmutableSegmentDataManager is returned by 
table data manager
     ImmutableSegmentDataManager immSegMngrB0 = 
mock(ImmutableSegmentDataManager.class);
     when(tableDataManagerB.acquireSegment(segB0)).thenReturn(immSegMngrB0);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              0                       20                     100   
            90
-    // segA1              0                       200                    100   
            101
+    //              minimum ingestion lag     minFreshnessMs
+    // segA0              5                       10  (caught up)
+    // segA1              0                       10  (caught up)
     // segB0              already committed
-    setupLatestIngestionTimestamp(segMngrA0, 90L);
-    // Unexpected case where latest ingested is somehow after current time
-    setupLatestIngestionTimestamp(segMngrA1, 101L);
+    setupMinimumIngestionLag(segMngrA0, 5L);
+    setupMinimumIngestionLag(segMngrA1, 0L);
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 1);
     consumingSegments.get("tableB_REALTIME").remove(segB0);
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
@@ -556,8 +520,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
-            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L, 100L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
0L);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 3);
@@ -587,9 +551,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
     when(segMngrB0.getStreamPartitionId()).thenReturn(0);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
-    when(segMngrB0.getSegmentName()).thenReturn(segB0);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -601,24 +562,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     when(segMngrA0.getCurrentOffset()).thenReturn(null);
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
     when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
-    setupLatestIngestionTimestamp(segMngrA1, 90L);
-    setupLatestIngestionTimestamp(segMngrB0, 0L);
-
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              null                    20                     100   
            Long.MIN_VALUE
-    // segA1              0                       200                    100   
            90
-    // segB0              0                       null                   100   
            0
+    // segA0 and segB0 have unknown minimum lag, segA1 has good lag (caught up)
+    setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
+    setupMinimumIngestionLag(segMngrA1, 5L);
+    setupMinimumIngestionLag(segMngrB0, Long.MAX_VALUE);
+
+    //              current offset          latest stream offset    minimum 
ingestion lag
+    // segA0              null                    20                     
MAX_VALUE (unknown)
+    // segA1              0                       200                    5 
(caught up)
+    // segB0              0                       null                   
MAX_VALUE (unknown)
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 2);
 
-    //              current offset          latest stream offset    current 
time    last ingestion time
-    // segA0              20                      20                     100   
            89
-    // segA1              0                       200                    100   
            90
-    // segB0              0                       0                      100   
            0
-    setupLatestIngestionTimestamp(segMngrA0, 89L);
+    //              current offset          latest stream offset    minimum 
ingestion lag
+    // segA0              20                      20                     
MAX_VALUE (offset caught up)
+    // segA1              0                       200                    5 
(caught up)
+    // segB0              0                       0                      
MAX_VALUE (offset caught up)
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
-    when(segB0Provider.supportsOffsetLag()).thenReturn(true);
     when(segB0Provider.fetchLatestStreamOffset(anySet(), 
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 0);
   }
@@ -632,8 +591,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
     long idleTimeoutMs = 10L;
     FreshnessBasedConsumptionStatusChecker statusChecker =
-        new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
-            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
idleTimeoutMs, 100L);
+        new FreshnessBasedConsumptionStatusChecker(instanceDataManager, 
consumingSegments,
+            
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L, 
idleTimeoutMs);
 
     // TableDataManager is not set up yet
     
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
 2);
@@ -656,8 +615,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
 
     when(segMngrA0.getStreamPartitionId()).thenReturn(0);
     when(segMngrA1.getStreamPartitionId()).thenReturn(1);
-    when(segMngrA0.getSegmentName()).thenReturn(segA0);
-    when(segMngrA1.getSegmentName()).thenReturn(segA1);
 
     when(segA0Provider.supportsOffsetLag()).thenReturn(true);
     when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -671,9 +628,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
 
     when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
     when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
-    // ensure negative values are ignored
-    setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
-    setupLatestIngestionTimestamp(segMngrA1, Long.MIN_VALUE);
+    // minimum ingestion lag is higher than threshold
+    setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
+    setupMinimumIngestionLag(segMngrA1, Long.MAX_VALUE);
 
     // segA0 has idle time below threshold, segA1 has idle time above threshold
     when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs 
- 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to