This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eec756e989d [Auto reset 1/3]Auto reset offset during ingestion lag
(#16492)
eec756e989d is described below
commit eec756e989d93c8d1806860cb75ea7f195de8186
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Aug 15 14:05:20 2025 -0700
[Auto reset 1/3]Auto reset offset during ingestion lag (#16492)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 77 ++++++++++-
.../PinotLLCRealtimeSegmentManagerTest.java | 148 +++++++++++++++++++++
.../kafka20/KafkaStreamMetadataProvider.java | 6 +
.../kafka30/KafkaStreamMetadataProvider.java | 7 +
.../org/apache/pinot/spi/stream/StreamConfig.java | 62 ++++++++-
.../pinot/spi/stream/StreamConfigProperties.java | 19 +++
.../pinot/spi/stream/StreamMetadataProvider.java | 6 +
7 files changed, 318 insertions(+), 7 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e0e5d291445..0d135dd37e3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -171,6 +171,8 @@ public class PinotLLCRealtimeSegmentManager {
// Max time to wait for all LLC segments to complete committing their
metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS =
30_000L;
+ // Timeout for calling stream metadata provider APIs
+ private static final long STREAM_FETCH_TIMEOUT_MS = 5_000L;
// TODO: make this configurable with default set to 10
/**
@@ -927,7 +929,11 @@ public class PinotLLCRealtimeSegmentManager {
int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
- String startOffset = committingSegmentDescriptor.getNextOffset();
+
+ // Handle offset auto reset
+ String nextOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = computeStartOffset(
+ nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
LOGGER.info(
"Creating segment ZK metadata for new CONSUMING segment: {} with start
offset: {} and creation time: {}",
@@ -958,6 +964,65 @@ public class PinotLLCRealtimeSegmentManager {
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ private String computeStartOffset(String nextOffset, StreamConfig
streamConfig, int partitionId) {
+ if (!streamConfig.isEnableOffsetAutoReset()) {
+ return nextOffset;
+ }
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ LOGGER.warn("Invalid offset auto reset configuration for table: {},
topic: {}. "
+ + "timeThreshold: {}, offsetThreshold: {}",
+ streamConfig.getTableNameWithType(), streamConfig.getTopicName(),
timeThreshold, offsetThreshold);
+ return nextOffset;
+ }
+ String clientId = getTableTopicUniqueClientId(streamConfig);
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ StreamPartitionMsgOffsetFactory offsetFactory =
consumerFactory.createStreamMsgOffsetFactory();
+ StreamPartitionMsgOffset nextOffsetWithType =
offsetFactory.create(nextOffset);
+ StreamPartitionMsgOffset offsetAtSLA = null;
+ StreamPartitionMsgOffset latestOffset;
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createPartitionMetadataProvider(clientId,
+ partitionId)) {
+ // Fetching timestamp from an offset is an expensive operation which
requires reading the data,
+ // while fetching offset from timestamp is lightweight and only needs to
read metadata.
+ // Hence, instead of checking if latestOffset's time - nextOffset's time
< SLA, we would check
+ // (CurrentTime - SLA)'s offset > nextOffset.
+ // TODO: it is relying on System.currentTimeMillis() which might be
affected by time drift. If we are able to
+ // get nextOffset's time, we should instead check (nextOffset's time +
SLA)'s offset < latestOffset
+ latestOffset = metadataProvider.fetchStreamPartitionOffset(
+ OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+ LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ latestOffset);
+ if (timeThreshold > 0) {
+ offsetAtSLA =
+ metadataProvider.getOffsetAtTimestamp(partitionId,
System.currentTimeMillis() - timeThreshold * 1000,
+ STREAM_FETCH_TIMEOUT_MS);
+ LOGGER.info("Offset at SLA of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId,
+ offsetAtSLA);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting
offsets", e);
+ return nextOffset;
+ }
+ try {
+ if (timeThreshold > 0 && offsetAtSLA != null &&
offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
+ LOGGER.info("Auto reset offset from {} to {} on partition {} because
time threshold reached", nextOffset,
+ latestOffset, partitionId);
+ return latestOffset.toString();
+ }
+ if (offsetThreshold > 0
+ && Long.parseLong(latestOffset.toString()) -
Long.parseLong(nextOffset) > offsetThreshold) {
+ LOGGER.info("Auto reset offset from {} to {} on partition {} because
number of offsets threshold reached",
+ nextOffset, latestOffset, partitionId);
+ return latestOffset.toString();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Not able to compare the offsets, skip auto resetting
offsets", e);
+ }
+ return nextOffset;
+ }
+
@Nullable
private SegmentPartitionMetadata
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
int numPartitionGroups) {
@@ -1006,6 +1071,12 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
+ private String getTableTopicUniqueClientId(StreamConfig streamConfig) {
+ return StreamConsumerFactory.getUniqueClientId(
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName());
+ }
+
/**
* Fetches the partition ids for the stream. Some stream (e.g. Kinesis)
might not support this operation, in which
* case exception will be thrown.
@@ -1013,9 +1084,7 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
Set<Integer> getPartitionIds(StreamConfig streamConfig)
throws Exception {
- String clientId = StreamConsumerFactory.getUniqueClientId(
- PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
- + streamConfig.getTopicName());
+ String clientId = getTableTopicUniqueClientId(streamConfig);
StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider metadataProvider =
consumerFactory.createStreamMetadataProvider(clientId)) {
return metadataProvider.fetchPartitionIds(5000L);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d15f87efbdc..263484c5d9d 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -77,9 +77,15 @@ import
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
@@ -91,6 +97,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.apache.zookeeper.data.Stat;
import org.joda.time.Interval;
+import org.mockito.MockedStatic;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -122,6 +129,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
static final String CRC = Long.toString(RANDOM.nextLong() & 0xFFFFFFFFL);
static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ?
SegmentVersion.v1 : SegmentVersion.v3;
static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
+ static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 +
NUM_DOCS;
static final int SEGMENT_SIZE_IN_BYTES = 100000000;
@AfterClass
public void tearDown()
@@ -325,6 +333,139 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertNull(consumingSegmentZKMetadata);
}
+ @Test
+ public void testCommitSegmentWithOffsetAutoResetOnOffset()
+ throws Exception {
+ // Set up a new table with 2 replicas, 5 instances, 4 partition
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+ setUpNewTable(segmentManager, 2, 5, 4);
+ Map<String, Map<String, String>> instanceStatesMap =
segmentManager._idealState.getRecord().getMapFields();
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
+ streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET,
String.valueOf(true));
+
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY,
"100");
+ segmentManager.makeTableConfig(streamConfigMap);
+
+ StreamConsumerFactory mockConsumerFactory =
mock(StreamConsumerFactory.class);
+ StreamMetadataProvider mockMetadataProvider =
mock(StreamMetadataProvider.class);
+ when(mockConsumerFactory.createPartitionMetadataProvider(anyString(),
anyInt())).thenReturn(mockMetadataProvider);
+ when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new
LongMsgOffsetFactory());
+
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
+ anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
+ when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(),
anyLong())).thenReturn(PARTITION_OFFSET);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider =
mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+
+ mockedStaticProvider.when(() ->
StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
+ .thenReturn(mockConsumerFactory);
+
+ // Commit a segment for partition group 0
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() +
NUM_DOCS).toString();
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ new CommittingSegmentDescriptor(committingSegment, endOffset,
SEGMENT_SIZE_IN_BYTES);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ // Verify instance states for committed segment and new consuming segment
+ Map<String, String> committedSegmentInstanceStateMap =
instanceStatesMap.get(committingSegment);
+ assertNotNull(committedSegmentInstanceStateMap);
+ assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.ONLINE));
+
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
+ Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.get(consumingSegment);
+ assertNotNull(consumingSegmentInstanceStateMap);
+ assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.CONSUMING));
+
+ // Verify segment ZK metadata for committed segment and new consuming
segment
+ SegmentZKMetadata committedSegmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(committingSegment);
+ assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+ assertEquals(committedSegmentZKMetadata.getStartOffset(),
PARTITION_OFFSET.toString());
+ assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
+ assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
+ assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+ assertEquals(committedSegmentZKMetadata.getIndexVersion(),
SEGMENT_VERSION.name());
+ assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+ assertEquals(committedSegmentZKMetadata.getSizeInBytes(),
SEGMENT_SIZE_IN_BYTES);
+
+ SegmentZKMetadata consumingSegmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(consumingSegment);
+ assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
+ assertEquals(consumingSegmentZKMetadata.getStartOffset(),
String.valueOf(LATEST_OFFSET));
+ assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
+ }
+ }
+
+ @Test
+ public void testCommitSegmentWithOffsetAutoResetOnTime()
+ throws Exception {
+ // Set up a new table with 2 replicas, 5 instances, 4 partition
+ PinotHelixResourceManager mockHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ FakePinotLLCRealtimeSegmentManager segmentManager =
+ new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+ setUpNewTable(segmentManager, 2, 5, 4);
+ Map<String, Map<String, String>> instanceStatesMap =
segmentManager._idealState.getRecord().getMapFields();
+ Map<String, String> streamConfigMap =
IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
+ streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET,
String.valueOf(true));
+
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY,
"1800");
+ segmentManager.makeTableConfig(streamConfigMap);
+
+ StreamConsumerFactory mockConsumerFactory =
mock(StreamConsumerFactory.class);
+ StreamMetadataProvider mockMetadataProvider =
mock(StreamMetadataProvider.class);
+ when(mockConsumerFactory.createPartitionMetadataProvider(anyString(),
anyInt())).thenReturn(mockMetadataProvider);
+ when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new
LongMsgOffsetFactory());
+
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
+ anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
+ when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(),
anyLong())).thenReturn(
+ new LongMsgOffset(PARTITION_OFFSET.getOffset() + 1L));
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider =
mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+
+ mockedStaticProvider.when(() ->
StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
+ .thenReturn(mockConsumerFactory);
+
+ // Commit a segment for partition group 0
+ String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
+ String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() +
NUM_DOCS).toString();
+ CommittingSegmentDescriptor committingSegmentDescriptor =
+ new CommittingSegmentDescriptor(committingSegment, endOffset,
SEGMENT_SIZE_IN_BYTES);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME,
committingSegmentDescriptor);
+
+ // Verify instance states for committed segment and new consuming segment
+ Map<String, String> committedSegmentInstanceStateMap =
instanceStatesMap.get(committingSegment);
+ assertNotNull(committedSegmentInstanceStateMap);
+ assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.ONLINE));
+
+ String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1,
CURRENT_TIME_MS).getSegmentName();
+ Map<String, String> consumingSegmentInstanceStateMap =
instanceStatesMap.get(consumingSegment);
+ assertNotNull(consumingSegmentInstanceStateMap);
+ assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.CONSUMING));
+
+ // Verify segment ZK metadata for committed segment and new consuming
segment
+ SegmentZKMetadata committedSegmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(committingSegment);
+ assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+ assertEquals(committedSegmentZKMetadata.getStartOffset(),
PARTITION_OFFSET.toString());
+ assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
+ assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
+ assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+ assertEquals(committedSegmentZKMetadata.getIndexVersion(),
SEGMENT_VERSION.name());
+ assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+ assertEquals(committedSegmentZKMetadata.getSizeInBytes(),
SEGMENT_SIZE_IN_BYTES);
+
+ SegmentZKMetadata consumingSegmentZKMetadata =
segmentManager._segmentZKMetadataMap.get(consumingSegment);
+ assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
+ assertEquals(consumingSegmentZKMetadata.getStartOffset(),
String.valueOf(LATEST_OFFSET));
+ assertEquals(committedSegmentZKMetadata.getCreationTime(),
CURRENT_TIME_MS);
+ }
+ }
+
/**
* Test cases for the scenario where stream partitions increase, and the
validation manager is attempting to create
* segments for new partitions. This test assumes that all other factors
remain the same (no error conditions or
@@ -1725,6 +1866,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
_streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
}
+ void makeTableConfig(Map<String, String> streamConfigMap) {
+ _tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
+ .setStreamConfigs(streamConfigMap).build();
+ _streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
+ }
+
void makeConsumingInstancePartitions() {
List<String> instances = new ArrayList<>(_numInstances);
for (int i = 0; i < _numInstances; i++) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 0b13c05ba48..66361984bb2 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -187,6 +187,12 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long
timestampMillis, long timeoutMillis) {
+ return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition,
timestampMillis),
+ Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
+ }
+
public static class KafkaTopicMetadata implements TopicMetadata {
private String _name;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 62a181f605d..c0d005ae26d 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -199,6 +199,13 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
return this;
}
}
+
+ @Override
+ public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long
timestampMillis, long timeoutMillis) {
+ return new
LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition,
timestampMillis),
+ Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index d907c0a9e4a..6fab25a51e8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -75,6 +75,10 @@ public class StreamConfig {
private final double _topicConsumptionRateLimit;
+ private final boolean _enableOffsetAutoReset;
+ private final int _offsetAutoResetOffsetThreshold;
+ private final long _offsetAutoResetTimeSecThreshold;
+
private final Map<String, String> _streamConfigMap = new HashMap<>();
// Allow overriding it to use different offset criteria
@@ -199,6 +203,10 @@ public class StreamConfig {
String rate =
streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT);
_topicConsumptionRateLimit = rate != null ? Double.parseDouble(rate) :
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
+ _enableOffsetAutoReset =
Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET));
+ _offsetAutoResetOffsetThreshold =
parseOffsetAutoResetOffsetThreshold(streamConfigMap);
+ _offsetAutoResetTimeSecThreshold =
parseOffsetAutoResetTimeSecThreshold(streamConfigMap);
+
_streamConfigMap.putAll(streamConfigMap);
}
@@ -310,6 +318,34 @@ public class StreamConfig {
}
}
+ public static int parseOffsetAutoResetOffsetThreshold(Map<String, String>
streamConfigMap) {
+ String key = StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY;
+ String offsetAutoResetOffsetThresholdStr = streamConfigMap.get(key);
+ if (offsetAutoResetOffsetThresholdStr != null) {
+ try {
+ return Integer.parseInt(offsetAutoResetOffsetThresholdStr);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid config " + key + ": " +
offsetAutoResetOffsetThresholdStr);
+ }
+ } else {
+ return -1; // Default value indicating disabled
+ }
+ }
+
+ public static long parseOffsetAutoResetTimeSecThreshold(Map<String, String>
streamConfigMap) {
+ String key =
StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY;
+ String offsetAutoResetTimeSecThresholdStr = streamConfigMap.get(key);
+ if (offsetAutoResetTimeSecThresholdStr != null) {
+ try {
+ return Long.parseLong(offsetAutoResetTimeSecThresholdStr);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Invalid config " + key + ": " +
offsetAutoResetTimeSecThresholdStr);
+ }
+ } else {
+ return -1; // Default value indicating disabled
+ }
+ }
+
public String getType() {
return _type;
}
@@ -383,6 +419,18 @@ public class StreamConfig {
: Optional.of(_topicConsumptionRateLimit);
}
+ public boolean isEnableOffsetAutoReset() {
+ return _enableOffsetAutoReset;
+ }
+
+ public int getOffsetAutoResetOffsetThreshold() {
+ return _offsetAutoResetOffsetThreshold;
+ }
+
+ public long getOffsetAutoResetTimeSecThreshold() {
+ return _offsetAutoResetTimeSecThreshold;
+ }
+
public String getTableNameWithType() {
return _tableNameWithType;
}
@@ -402,7 +450,11 @@ public class StreamConfig {
+ _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" +
_flushThresholdSegmentSizeBytes
+ ", _flushThresholdVarianceFraction=" +
_flushThresholdVarianceFraction
+ ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ",
_groupId='" + _groupId + '\''
- + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit + ",
_streamConfigMap=" + _streamConfigMap
+ + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit
+ + ", _enableOffsetAutoReset=" + _enableOffsetAutoReset
+ + ", _offsetAutoResetOffsetThreshold" + _offsetAutoResetOffsetThreshold
+ + ", _offSetAutoResetTimeSecThreshold" +
_offsetAutoResetTimeSecThreshold
+ + ", _streamConfigMap=" + _streamConfigMap
+ ", _offsetCriteria=" + _offsetCriteria + ",
_serverUploadToDeepStore=" + _serverUploadToDeepStore + '}';
}
@@ -427,7 +479,10 @@ public class StreamConfig {
&& Objects.equals(_consumerFactoryClassName,
that._consumerFactoryClassName) && Objects.equals(_decoderClass,
that._decoderClass) && Objects.equals(_decoderProperties,
that._decoderProperties) && Objects.equals(_groupId,
that._groupId) && Objects.equals(_streamConfigMap,
that._streamConfigMap) && Objects.equals(_offsetCriteria,
- that._offsetCriteria) &&
Objects.equals(_flushThresholdVarianceFraction,
that._flushThresholdVarianceFraction);
+ that._offsetCriteria) &&
Objects.equals(_flushThresholdVarianceFraction,
that._flushThresholdVarianceFraction)
+ && _enableOffsetAutoReset == that._enableOffsetAutoReset
+ && _offsetAutoResetOffsetThreshold ==
that._offsetAutoResetOffsetThreshold
+ && _offsetAutoResetTimeSecThreshold ==
that._offsetAutoResetTimeSecThreshold;
}
@Override
@@ -436,6 +491,7 @@ public class StreamConfig {
_decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis,
_idleTimeoutMillis, _flushThresholdRows,
_flushThresholdSegmentRows, _flushThresholdTimeMillis,
_flushThresholdSegmentSizeBytes,
_flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit,
_streamConfigMap, _offsetCriteria,
- _serverUploadToDeepStore, _flushThresholdVarianceFraction);
+ _serverUploadToDeepStore, _flushThresholdVarianceFraction,
_offsetAutoResetOffsetThreshold,
+ _enableOffsetAutoReset, _offsetAutoResetTimeSecThreshold);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 4286adbbb27..79db54dd288 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -143,6 +143,25 @@ public class StreamConfigProperties {
public static final String PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS =
"realtime.segment.pauseless.download.timeoutSeconds";
+ /**
+ * Config used to enable offset auto reset during segment commit.
+ */
+ public static final String ENABLE_OFFSET_AUTO_RESET =
"realtime.segment.offsetAutoReset.enable";
+
+ /**
+ * During segment commit, the new segment startOffset would skip to the
latest offset if thisValue is set as positive
+ * and (latestStreamOffset - latestIngestedOffset > thisValue)
+ */
+ public static final String OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY =
+ "realtime.segment.offsetAutoReset.offsetThreshold";
+
+ /**
+ * During segment commit, the new segment startOffset would skip to the
latest offset if thisValue is set as positive
+ * and (latestStreamOffset's timestamp - latestIngestedOffset's timestamp >
thisValue)
+ */
+ public static final String OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY =
+ "realtime.segment.offsetAutoReset.timeThresholdSeconds";
+
/**
* Helper method to create a stream specific property
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 66bf9768b5f..6cf724b058b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -127,6 +128,11 @@ public interface StreamMetadataProvider extends Closeable {
return result;
}
+ @Nullable
+ default StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long
timestampMillis, long timeoutMillis) {
+ return null;
+ }
+
/**
* Fetches the list of available topics/streams
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]