This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 08edbc98106 freshness checkers uses minimum ingestion lag (#17598)
08edbc98106 is described below
commit 08edbc98106af3b67bc46461684ee42bdb51818c
Author: Johan Adami <[email protected]>
AuthorDate: Mon Feb 9 20:09:27 2026 -0500
freshness checkers uses minimum ingestion lag (#17598)
---
.../indexsegment/mutable/MutableSegmentImpl.java | 10 +
.../apache/pinot/segment/spi/SegmentMetadata.java | 9 +
.../spi/index/metadata/SegmentMetadataImpl.java | 5 +
.../FreshnessBasedConsumptionStatusChecker.java | 50 ++---
...FreshnessBasedConsumptionStatusCheckerTest.java | 245 +++++++++------------
5 files changed, 149 insertions(+), 170 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 2e8d9bb4bf5..43150d61c59 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -191,6 +191,7 @@ public class MutableSegmentImpl implements MutableSegment {
// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
+ private volatile long _minimumIngestionLagMs = Long.MAX_VALUE;
// multi-column text index fields
private final MultiColumnRealtimeLuceneTextIndex _multiColumnTextIndex;
@@ -224,6 +225,11 @@ public class MutableSegmentImpl implements MutableSegment {
return _latestIngestionTimeMs;
}
+ @Override
+ public long getMinimumIngestionLagMs() {
+ return _minimumIngestionLagMs;
+ }
+
@Override
public boolean isMutableSegment() {
return true;
@@ -701,6 +707,10 @@ public class MutableSegmentImpl implements MutableSegment {
_lastIndexedTimeMs = System.currentTimeMillis();
if (metadata != null) {
_latestIngestionTimeMs = Math.max(_latestIngestionTimeMs,
metadata.getRecordIngestionTimeMs());
+ // if for some reason the record ingestion time is in the future, we
should not
+ // update the minimum ingestion lag past 0
+ long ingestionLagMs = Math.max(0, _lastIndexedTimeMs -
_latestIngestionTimeMs);
+ _minimumIngestionLagMs = Math.min(_minimumIngestionLagMs,
ingestionLagMs);
}
return canTakeMore;
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
index e76a540c975..eff83442d2e 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/SegmentMetadata.java
@@ -95,6 +95,15 @@ public interface SegmentMetadata {
*/
long getLatestIngestionTimestamp();
+ /**
+ * Return the minimum ingestion lag recorded for this segment. Ingestion lag
is
+ * the difference between the record ingestion timestamp and current system
time.
+ * Applicable for MutableSegments.
+ *
+ * @return minimum ingestion lag recorded for this segment
+ */
+ long getMinimumIngestionLagMs();
+
@Nullable
List<StarTreeV2Metadata> getStarTreeV2MetadataList();
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
index 5a79535fead..1f3a0c9a52e 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java
@@ -438,6 +438,11 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
return Long.MIN_VALUE;
}
+ @Override
+ public long getMinimumIngestionLagMs() {
+ return Long.MAX_VALUE;
+ }
+
@Nullable
@Override
public List<StarTreeV2Metadata> getStarTreeV2MetadataList() {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 5a02a91f2dd..d7d97da7212 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentMetadataUtils;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -35,8 +36,9 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
* execution happens and consumers try to catch up to the latest messages
available in streams.
* To achieve this, every time status check is called - {@link
#getNumConsumingSegmentsNotReachedIngestionCriteria} -
* for each consuming segment, we check if either:
- * - the segment's latest ingested offset has reached the current stream
offset that's
- * - the last ingested message is within {@link #_minFreshnessMs} of the
current system time
+ * - the segment's minimum ingestion lag is within {@link #_minFreshnessMs}
+ * - the segment's latest ingested offset has reached the current stream
offset
+ * - the segment has been idle longer than {@link #_idleTimeoutMs}
*/
public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsumptionStatusChecker {
private final long _minFreshnessMs;
@@ -48,32 +50,29 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
_minFreshnessMs = minFreshnessMs;
_idleTimeoutMs = idleTimeoutMs;
+ _logger.info("FreshnessBasedConsumptionStatusChecker initialized with
min_freshness={}ms, idle_timeout={}ms",
+ _minFreshnessMs, _idleTimeoutMs);
}
private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
}
- protected long now() {
- return System.currentTimeMillis();
- }
-
@Override
protected boolean isSegmentCaughtUp(String segmentName,
RealtimeSegmentDataManager rtSegmentDataManager,
RealtimeTableDataManager realtimeTableDataManager) {
- long now = now();
- long latestIngestionTimestamp =
-
rtSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
- long freshnessMs = now - latestIngestionTimestamp;
+ SegmentMetadata segmentMetadata =
rtSegmentDataManager.getSegment().getSegmentMetadata();
+ long minimumIngestionLagMs = segmentMetadata.getMinimumIngestionLagMs();
- // We check latestIngestionTimestamp >= 0 because the default freshness
when unknown is Long.MIN_VALUE
- if (latestIngestionTimestamp >= 0 && freshnessMs <= _minFreshnessMs) {
- _logger.info("Segment {} with freshness {}ms has caught up within min
freshness {}", segmentName, freshnessMs,
- _minFreshnessMs);
+ // Simple freshness check - if minimum lag ever seen is within threshold,
we're caught up.
+ // Note: default value is Long.MAX_VALUE when unknown, which will
correctly fail this check.
+ if (minimumIngestionLagMs <= _minFreshnessMs) {
+ _logger.info("Segment {} with minimum ingestion lag {}ms has caught up
within min freshness {}ms",
+ segmentName, minimumIngestionLagMs, _minFreshnessMs);
return true;
}
- // For stream partitions that see very low volume, it's possible we're
already caught up but the oldest
+ // Fallback: Check offset catchup.
// message is too old to pass the freshness check. We check this condition
separately to avoid hitting
// the stream consumer to check partition count if we're already caught up.
StreamPartitionMsgOffset currentOffset =
rtSegmentDataManager.getCurrentOffset();
@@ -89,26 +88,25 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
segmentName, e);
}
- // Check if we're caught up (isOffsetCaughtUp handles null
latestStreamOffset by returning false)
if (isOffsetCaughtUp(segmentName, currentOffset, latestStreamOffset)) {
- _logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
- + "But the current ingested offset is equal to the latest
available offset {}.", segmentName, freshnessMs,
- _minFreshnessMs, currentOffset);
+ _logger.info("Segment {} with minimum ingestion lag {}ms has not caught
up within min freshness {}ms. "
+ + "But the current ingested offset is equal to the latest
available offset {}.",
+ segmentName, minimumIngestionLagMs, _minFreshnessMs, currentOffset);
return true;
}
+ // Fallback: Check idle timeout.
long idleTimeMs = rtSegmentDataManager.getTimeSinceEventLastConsumedMs();
if (segmentHasBeenIdleLongerThanThreshold(idleTimeMs)) {
- _logger.warn("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
- + "But the current ingested offset {} has been idle for {}ms. At
offset {}. " + "Latest offset {}.",
- segmentName, freshnessMs, _minFreshnessMs, currentOffset,
idleTimeMs, currentOffset, latestStreamOffset);
+ _logger.warn("Segment {} with minimum ingestion lag {}ms has not caught
up within min freshness {}ms. "
+ + "But has been idle for {}ms. At offset {}. Latest offset {}.",
+ segmentName, minimumIngestionLagMs, _minFreshnessMs, idleTimeMs,
currentOffset, latestStreamOffset);
return true;
}
- _logger.info("Segment {} with freshness {}ms has not caught up within "
- + "min freshness {}. At offset {}. Latest offset {}.",
segmentName, freshnessMs, _minFreshnessMs,
- currentOffset,
- latestStreamOffset);
+ _logger.info("Segment {} with minimum ingestion lag {}ms has not caught up
within min freshness {}ms. "
+ + "At offset {}. Latest offset {}. Idle time ms {}",
+ segmentName, minimumIngestionLagMs, _minFreshnessMs, currentOffset,
latestStreamOffset, idleTimeMs);
return false;
}
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index a235b1d4382..17c98b0f812 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
@@ -45,21 +44,13 @@ import static org.testng.Assert.assertEquals;
public class FreshnessBasedConsumptionStatusCheckerTest {
- private class FakeFreshnessBasedConsumptionStatusChecker extends
FreshnessBasedConsumptionStatusChecker {
-
- private final long _now;
-
- public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Map<String, Set<String>> consumingSegments, Function<String,
Set<String>> consumingSegmentsSupplier,
- long minFreshnessMs, long idleTimeoutMs, long now) {
- super(instanceDataManager, consumingSegments, consumingSegmentsSupplier,
minFreshnessMs, idleTimeoutMs);
- _now = now;
- }
-
- @Override
- protected long now() {
- return _now;
- }
+ private void setupMinimumIngestionLag(RealtimeSegmentDataManager
segmentDataManager,
+ long minimumIngestionLagMs) {
+ MutableSegment mockSegment = mock(MutableSegment.class);
+ SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
+ when(mockSegment.getSegmentMetadata()).thenReturn(mockSegmentMetdata);
+
when(mockSegmentMetdata.getMinimumIngestionLagMs()).thenReturn(minimumIngestionLagMs);
+ when(segmentDataManager.getSegment()).thenReturn(mockSegment);
}
@Test
@@ -92,20 +83,17 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
- MutableSegment mockSegment = mock(MutableSegment.class);
- SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
- when(mockSegment.getSegmentMetadata()).thenReturn(mockSegmentMetdata);
- when(mockSegmentMetdata.getLatestIngestionTimestamp()).thenReturn(0L);
+ // mock minimum ingestion lag higher than the threshold so offset catchup
is used
+ setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
+ setupMinimumIngestionLag(segMngrA1, Long.MAX_VALUE);
+ setupMinimumIngestionLag(segMngrB0, Long.MAX_VALUE);
- // current offset latest stream offset current
time last ingestion time
- // segA0 15 20 now
0
- // segA1 150 200 now
0
- // segB0 1500 2000 now
0
- when(segMngrA0.getSegment()).thenReturn(mockSegment);
+ // current offset latest stream offset minimum
ingestion lag
+ // segA0 15 20
MAX_VALUE
+ // segA1 150 200
MAX_VALUE
+ // segB0 1500 2000
MAX_VALUE
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(15));
- when(segMngrA1.getSegment()).thenReturn(mockSegment);
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(150));
- when(segMngrB0.getSegment()).thenReturn(mockSegment);
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1500));
StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
@@ -119,9 +107,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
- when(segMngrB0.getSegmentName()).thenReturn(segB0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -132,20 +117,20 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
- // current offset latest stream offset current
time last ingestion time
- // segA0 20 20 now
0
- // segA1 300 200 now
0
- // segB0 1998 2000 now
0
+ // current offset latest stream offset
+ // segA0 20 20
+ // segA1 300 200
+ // segB0 1998 2000
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
// The unexpected case where currentOffset > latestOffset
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(300));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(1998));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
- // current offset latest stream offset current
time last ingestion time
- // segA0 20 20 100
0
- // segA1 200 200 100
0
- // segB0 2000 2000 100
0
+ // current offset latest stream offset
+ // segA0 20 20
+ // segA1 200 200
+ // segB0 2000 2000
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(2000));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
@@ -180,17 +165,16 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
StreamMetadataProvider segA0Provider = mock(StreamMetadataProvider.class);
when(tableDataManagerA.getStreamMetadataProvider(segMngrA0)).thenReturn(segA0Provider);
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(20)));
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+ // ensure high lag values are handled - offset catchup will be used
+ setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
- // current offset latest stream offset current
time last ingestion time
- // segA0 0 20 100
Long.MIN_VALUE
+ // current offset latest stream offset minimum
ingestion lag
+ // segA0 0 20
MAX_VALUE
// segA1 (segment is absent)
// segB0 (table is absent)
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -205,15 +189,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
- private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager
segmentDataManager,
- long latestIngestionTimestamp) {
- MutableSegment mockSegment = mock(MutableSegment.class);
- SegmentMetadata mockSegmentMetdata = mock(SegmentMetadata.class);
- when(mockSegment.getSegmentMetadata()).thenReturn(mockSegmentMetdata);
-
when(mockSegmentMetdata.getLatestIngestionTimestamp()).thenReturn(latestIngestionTimestamp);
- when(segmentDataManager.getSegment()).thenReturn(mockSegment);
- }
-
@Test
public void regularCaseWithFreshnessCatchup() {
String segA0 = "tableA__0__0__123Z";
@@ -223,9 +198,10 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
consumingSegments.put("tableA_REALTIME", ImmutableSet.of(segA0, segA1));
consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ long minFreshnessMs = 10L;
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
-
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments),
minFreshnessMs, 0L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -255,9 +231,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
- when(segMngrB0.getSegmentName()).thenReturn(segB0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -268,32 +241,32 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
- setupLatestIngestionTimestamp(segMngrA1, -1L);
- setupLatestIngestionTimestamp(segMngrB0, 0L);
-
- // current offset latest stream offset current
time last ingestion time
- // segA0 0 20 100
Long.MIN_VALUE
- // segA1 0 200 100
-1
- // segB0 0 2000 100
0
+
+ // High minimum ingestion lag - segments not caught up
+ setupMinimumIngestionLag(segMngrA0, 200L);
+ setupMinimumIngestionLag(segMngrA1, 200L);
+ setupMinimumIngestionLag(segMngrB0, 200L);
+
+ // minimum ingestion lag minFreshnessMs
+ // segA0 200 10
+ // segA1 200 10
+ // segB0 200 10
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
- // current offset latest stream offset current
time last ingestion time
- // segA0 0 20 100
90
- // segA1 0 200 100
101
- // segB0 0 2000 100
50
- setupLatestIngestionTimestamp(segMngrA0, 90L);
- // Unexpected case where latest ingested is somehow after current time
- setupLatestIngestionTimestamp(segMngrA1, 101L);
- setupLatestIngestionTimestamp(segMngrB0, 89L);
+ // minimum ingestion lag minFreshnessMs
+ // segA0 5 10 (caught up)
+ // segA1 10 10 (caught up - exactly at
threshold)
+ // segB0 15 10 (not caught up)
+ setupMinimumIngestionLag(segMngrA0, 5L);
+ setupMinimumIngestionLag(segMngrA1, 10L);
+ setupMinimumIngestionLag(segMngrB0, 15L);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
- // current offset latest stream offset current
time last ingestion time
- // segA0 20 20 100
90
- // segA1 200 200 100
101
- // segB0 1999 2000 100
95
- setupLatestIngestionTimestamp(segMngrB0, 95L);
+ // minimum ingestion lag minFreshnessMs
+ // segA0 5 10 (caught up)
+ // segA1 10 10 (caught up)
+ // segB0 8 10 (caught up)
+ setupMinimumIngestionLag(segMngrB0, 8L);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
@@ -308,8 +281,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
long idleTimeoutMs = 10L;
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
-
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
idleTimeoutMs, 100L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
idleTimeoutMs);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -339,9 +312,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
- when(segMngrB0.getSegmentName()).thenReturn(segB0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -353,10 +323,11 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
- setupLatestIngestionTimestamp(segMngrA1, -1L);
- setupLatestIngestionTimestamp(segMngrB0, 0L);
+
+ // mock minimum ingestion lag higher than the threshold so idle timeout is
used
+ setupMinimumIngestionLag(segMngrA0, 200L);
+ setupMinimumIngestionLag(segMngrA1, 200L);
+ setupMinimumIngestionLag(segMngrB0, 200L);
when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
@@ -403,8 +374,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
-
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -434,9 +405,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
- when(segMngrB0.getSegmentName()).thenReturn(segB0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -448,10 +416,10 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
- setupLatestIngestionTimestamp(segMngrA1, -1L);
- setupLatestIngestionTimestamp(segMngrB0, 0L);
+ // minimum ingestion lag is higher than threshold
+ setupMinimumIngestionLag(segMngrA0, 200L);
+ setupMinimumIngestionLag(segMngrA1, 200L);
+ setupMinimumIngestionLag(segMngrB0, 200L);
when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
@@ -474,8 +442,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
-
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -505,9 +473,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
- when(segMngrB0.getSegmentName()).thenReturn(segB0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -519,28 +484,27 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
- setupLatestIngestionTimestamp(segMngrA1, -1L);
- setupLatestIngestionTimestamp(segMngrB0, 0L);
-
- // current offset latest stream offset current
time last ingestion time
- // segA0 0 20 100
Long.MIN_VALUE
- // segA1 0 200 100
-1
- // segB0 0 2000 100
0
+ // minimum ingestion lag is higher than threshold
+ setupMinimumIngestionLag(segMngrA0, 200L);
+ setupMinimumIngestionLag(segMngrA1, 200L);
+ setupMinimumIngestionLag(segMngrB0, 200L);
+
+ // minimum ingestion lag minFreshnessMs
+ // segA0 200 10
+ // segA1 200 10
+ // segB0 200 10
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
// segB0 is now committed; ImmutableSegmentDataManager is returned by
table data manager
ImmutableSegmentDataManager immSegMngrB0 =
mock(ImmutableSegmentDataManager.class);
when(tableDataManagerB.acquireSegment(segB0)).thenReturn(immSegMngrB0);
- // current offset latest stream offset current
time last ingestion time
- // segA0 0 20 100
90
- // segA1 0 200 100
101
+ // minimum ingestion lag minFreshnessMs
+ // segA0 5 10 (caught up)
+ // segA1 0 10 (caught up)
// segB0 already committed
- setupLatestIngestionTimestamp(segMngrA0, 90L);
- // Unexpected case where latest ingested is somehow after current time
- setupLatestIngestionTimestamp(segMngrA1, 101L);
+ setupMinimumIngestionLag(segMngrA0, 5L);
+ setupMinimumIngestionLag(segMngrA1, 0L);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
consumingSegments.get("tableB_REALTIME").remove(segB0);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
@@ -556,8 +520,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
consumingSegments.put("tableB_REALTIME", ImmutableSet.of(segB0));
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
-
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L, 100L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
0L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -587,9 +551,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
when(segMngrB0.getStreamPartitionId()).thenReturn(0);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
- when(segMngrB0.getSegmentName()).thenReturn(segB0);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -601,24 +562,22 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getCurrentOffset()).thenReturn(null);
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
- setupLatestIngestionTimestamp(segMngrA1, 90L);
- setupLatestIngestionTimestamp(segMngrB0, 0L);
-
- // current offset latest stream offset current
time last ingestion time
- // segA0 null 20 100
Long.MIN_VALUE
- // segA1 0 200 100
90
- // segB0 0 null 100
0
+ // segA0 and segB0 have unknown minimum lag, segA1 has good lag (caught up)
+ setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
+ setupMinimumIngestionLag(segMngrA1, 5L);
+ setupMinimumIngestionLag(segMngrB0, Long.MAX_VALUE);
+
+ // current offset latest stream offset minimum
ingestion lag
+ // segA0 null 20
MAX_VALUE (unknown)
+ // segA1 0 200 5
(caught up)
+ // segB0 0 null
MAX_VALUE (unknown)
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
2);
- // current offset latest stream offset current
time last ingestion time
- // segA0 20 20 100
89
- // segA1 0 200 100
90
- // segB0 0 0 100
0
- setupLatestIngestionTimestamp(segMngrA0, 89L);
+ // current offset latest stream offset minimum
ingestion lag
+ // segA0 20 20
MAX_VALUE (offset caught up)
+ // segA1 0 200 5
(caught up)
+ // segB0 0 0
MAX_VALUE (offset caught up)
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
- when(segB0Provider.supportsOffsetLag()).thenReturn(true);
when(segB0Provider.fetchLatestStreamOffset(anySet(),
anyLong())).thenReturn(Map.of(0, new LongMsgOffset(0)));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
@@ -632,8 +591,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
long idleTimeoutMs = 10L;
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
-
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
idleTimeoutMs, 100L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments,
+
ConsumptionStatusCheckerTestUtils.getConsumingSegments(consumingSegments), 10L,
idleTimeoutMs);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
2);
@@ -656,8 +615,6 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getStreamPartitionId()).thenReturn(0);
when(segMngrA1.getStreamPartitionId()).thenReturn(1);
- when(segMngrA0.getSegmentName()).thenReturn(segA0);
- when(segMngrA1.getSegmentName()).thenReturn(segA1);
when(segA0Provider.supportsOffsetLag()).thenReturn(true);
when(segA1Provider.supportsOffsetLag()).thenReturn(true);
@@ -671,9 +628,9 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
- // ensure negative values are ignored
- setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
- setupLatestIngestionTimestamp(segMngrA1, Long.MIN_VALUE);
+ // minimum ingestion lag is higher than threshold
+ setupMinimumIngestionLag(segMngrA0, Long.MAX_VALUE);
+ setupMinimumIngestionLag(segMngrA1, Long.MAX_VALUE);
// segA0 has idle time below threshold, segA1 has idle time above threshold
when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
- 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]