This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eec756e989d [Auto reset 1/3]Auto reset offset during ingestion lag 
(#16492)
eec756e989d is described below

commit eec756e989d93c8d1806860cb75ea7f195de8186
Author: lnbest0707 <[email protected]>
AuthorDate: Fri Aug 15 14:05:20 2025 -0700

    [Auto reset 1/3]Auto reset offset during ingestion lag (#16492)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  77 ++++++++++-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 148 +++++++++++++++++++++
 .../kafka20/KafkaStreamMetadataProvider.java       |   6 +
 .../kafka30/KafkaStreamMetadataProvider.java       |   7 +
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  62 ++++++++-
 .../pinot/spi/stream/StreamConfigProperties.java   |  19 +++
 .../pinot/spi/stream/StreamMetadataProvider.java   |   6 +
 7 files changed, 318 insertions(+), 7 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e0e5d291445..0d135dd37e3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -171,6 +171,8 @@ public class PinotLLCRealtimeSegmentManager {
 
   // Max time to wait for all LLC segments to complete committing their 
metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 
30_000L;
+  // Timeout for calling stream metadata provider APIs
+  private static final long STREAM_FETCH_TIMEOUT_MS = 5_000L;
 
   // TODO: make this configurable with default set to 10
   /**
@@ -927,7 +929,11 @@ public class PinotLLCRealtimeSegmentManager {
       int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
-    String startOffset = committingSegmentDescriptor.getNextOffset();
+
+    // Handle offset auto reset
+    String nextOffset = committingSegmentDescriptor.getNextOffset();
+    String startOffset = computeStartOffset(
+        nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
 
     LOGGER.info(
         "Creating segment ZK metadata for new CONSUMING segment: {} with start 
offset: {} and creation time: {}",
@@ -958,6 +964,65 @@ public class PinotLLCRealtimeSegmentManager {
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
 
+  private String computeStartOffset(String nextOffset, StreamConfig 
streamConfig, int partitionId) {
+    if (!streamConfig.isEnableOffsetAutoReset()) {
+      return nextOffset;
+    }
+    long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+    int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+    if (timeThreshold <= 0 && offsetThreshold <= 0) {
+      LOGGER.warn("Invalid offset auto reset configuration for table: {}, 
topic: {}. "
+              + "timeThreshold: {}, offsetThreshold: {}",
+          streamConfig.getTableNameWithType(), streamConfig.getTopicName(), 
timeThreshold, offsetThreshold);
+      return nextOffset;
+    }
+    String clientId = getTableTopicUniqueClientId(streamConfig);
+    StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    StreamPartitionMsgOffsetFactory offsetFactory = 
consumerFactory.createStreamMsgOffsetFactory();
+    StreamPartitionMsgOffset nextOffsetWithType = 
offsetFactory.create(nextOffset);
+    StreamPartitionMsgOffset offsetAtSLA = null;
+    StreamPartitionMsgOffset latestOffset;
+    try (StreamMetadataProvider metadataProvider = 
consumerFactory.createPartitionMetadataProvider(clientId,
+        partitionId)) {
+      // Fetching timestamp from an offset is an expensive operation which 
requires reading the data,
+      // while fetching offset from timestamp is lightweight and only needs to 
read metadata.
+      // Hence, instead of checking if latestOffset's time - nextOffset's time 
< SLA, we would check
+      // (CurrentTime - SLA)'s offset > nextOffset.
+      // TODO: it is relying on System.currentTimeMillis() which might be 
affected by time drift. If we are able to
+      // get nextOffset's time, we should instead check (nextOffset's time + 
SLA)'s offset < latestOffset
+      latestOffset = metadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+      LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+          latestOffset);
+      if (timeThreshold > 0) {
+        offsetAtSLA =
+            metadataProvider.getOffsetAtTimestamp(partitionId, 
System.currentTimeMillis() - timeThreshold * 1000,
+                STREAM_FETCH_TIMEOUT_MS);
+        LOGGER.info("Offset at SLA of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+            offsetAtSLA);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting 
offsets", e);
+      return nextOffset;
+    }
+    try {
+      if (timeThreshold > 0 && offsetAtSLA != null && 
offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
+        LOGGER.info("Auto reset offset from {} to {} on partition {} because 
time threshold reached", nextOffset,
+            latestOffset, partitionId);
+        return latestOffset.toString();
+      }
+      if (offsetThreshold > 0
+          && Long.parseLong(latestOffset.toString()) - 
Long.parseLong(nextOffset) > offsetThreshold) {
+        LOGGER.info("Auto reset offset from {} to {} on partition {} because 
number of offsets threshold reached",
+            nextOffset, latestOffset, partitionId);
+        return latestOffset.toString();
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Not able to compare the offsets, skip auto resetting 
offsets", e);
+    }
+    return nextOffset;
+  }
+
   @Nullable
   private SegmentPartitionMetadata 
getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
       int numPartitionGroups) {
@@ -1006,6 +1071,12 @@ public class PinotLLCRealtimeSegmentManager {
     return commitTimeoutMS;
   }
 
+  private String getTableTopicUniqueClientId(StreamConfig streamConfig) {
+    return StreamConsumerFactory.getUniqueClientId(
+        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
+            + streamConfig.getTopicName());
+  }
+
   /**
    * Fetches the partition ids for the stream. Some stream (e.g. Kinesis) 
might not support this operation, in which
    * case exception will be thrown.
@@ -1013,9 +1084,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   Set<Integer> getPartitionIds(StreamConfig streamConfig)
       throws Exception {
-    String clientId = StreamConsumerFactory.getUniqueClientId(
-        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
-            + streamConfig.getTopicName());
+    String clientId = getTableTopicUniqueClientId(streamConfig);
     StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     try (StreamMetadataProvider metadataProvider = 
consumerFactory.createStreamMetadataProvider(clientId)) {
       return metadataProvider.fetchPartitionIds(5000L);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index d15f87efbdc..263484c5d9d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -77,9 +77,15 @@ import 
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
+import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
@@ -91,6 +97,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.apache.zookeeper.data.Stat;
 import org.joda.time.Interval;
+import org.mockito.MockedStatic;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
@@ -122,6 +129,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
   static final String CRC = Long.toString(RANDOM.nextLong() & 0xFFFFFFFFL);
   static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ? 
SegmentVersion.v1 : SegmentVersion.v3;
   static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
+  static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 + 
NUM_DOCS;
   static final int SEGMENT_SIZE_IN_BYTES = 100000000;
   @AfterClass
   public void tearDown()
@@ -325,6 +333,139 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertNull(consumingSegmentZKMetadata);
   }
 
+  @Test
+  public void testCommitSegmentWithOffsetAutoResetOnOffset()
+      throws Exception {
+    // Set up a new table with 2 replicas, 5 instances, 4 partition
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+    setUpNewTable(segmentManager, 2, 5, 4);
+    Map<String, Map<String, String>> instanceStatesMap = 
segmentManager._idealState.getRecord().getMapFields();
+    Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
+    streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, 
String.valueOf(true));
+    
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY,
 "100");
+    segmentManager.makeTableConfig(streamConfigMap);
+
+    StreamConsumerFactory mockConsumerFactory = 
mock(StreamConsumerFactory.class);
+    StreamMetadataProvider mockMetadataProvider = 
mock(StreamMetadataProvider.class);
+    when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), 
anyInt())).thenReturn(mockMetadataProvider);
+    when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new 
LongMsgOffsetFactory());
+    
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
+        anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
+    when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), 
anyLong())).thenReturn(PARTITION_OFFSET);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider = 
mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+
+      mockedStaticProvider.when(() -> 
StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
+          .thenReturn(mockConsumerFactory);
+
+      // Commit a segment for partition group 0
+      String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+      String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + 
NUM_DOCS).toString();
+      CommittingSegmentDescriptor committingSegmentDescriptor =
+          new CommittingSegmentDescriptor(committingSegment, endOffset, 
SEGMENT_SIZE_IN_BYTES);
+      committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+      segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+      // Verify instance states for committed segment and new consuming segment
+      Map<String, String> committedSegmentInstanceStateMap = 
instanceStatesMap.get(committingSegment);
+      assertNotNull(committedSegmentInstanceStateMap);
+      assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+          Collections.singleton(SegmentStateModel.ONLINE));
+
+      String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, 
CURRENT_TIME_MS).getSegmentName();
+      Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.get(consumingSegment);
+      assertNotNull(consumingSegmentInstanceStateMap);
+      assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
+          Collections.singleton(SegmentStateModel.CONSUMING));
+
+      // Verify segment ZK metadata for committed segment and new consuming 
segment
+      SegmentZKMetadata committedSegmentZKMetadata = 
segmentManager._segmentZKMetadataMap.get(committingSegment);
+      assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+      assertEquals(committedSegmentZKMetadata.getStartOffset(), 
PARTITION_OFFSET.toString());
+      assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
+      assertEquals(committedSegmentZKMetadata.getCreationTime(), 
CURRENT_TIME_MS);
+      assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+      assertEquals(committedSegmentZKMetadata.getIndexVersion(), 
SEGMENT_VERSION.name());
+      assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+      assertEquals(committedSegmentZKMetadata.getSizeInBytes(), 
SEGMENT_SIZE_IN_BYTES);
+
+      SegmentZKMetadata consumingSegmentZKMetadata = 
segmentManager._segmentZKMetadataMap.get(consumingSegment);
+      assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
+      assertEquals(consumingSegmentZKMetadata.getStartOffset(), 
String.valueOf(LATEST_OFFSET));
+      assertEquals(committedSegmentZKMetadata.getCreationTime(), 
CURRENT_TIME_MS);
+    }
+  }
+
+  @Test
+  public void testCommitSegmentWithOffsetAutoResetOnTime()
+      throws Exception {
+    // Set up a new table with 2 replicas, 5 instances, 4 partition
+    PinotHelixResourceManager mockHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
+    setUpNewTable(segmentManager, 2, 5, 4);
+    Map<String, Map<String, String>> instanceStatesMap = 
segmentManager._idealState.getRecord().getMapFields();
+    Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
+    streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, 
String.valueOf(true));
+    
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY,
 "1800");
+    segmentManager.makeTableConfig(streamConfigMap);
+
+    StreamConsumerFactory mockConsumerFactory = 
mock(StreamConsumerFactory.class);
+    StreamMetadataProvider mockMetadataProvider = 
mock(StreamMetadataProvider.class);
+    when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), 
anyInt())).thenReturn(mockMetadataProvider);
+    when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new 
LongMsgOffsetFactory());
+    
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
+        anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
+    when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), 
anyLong())).thenReturn(
+        new LongMsgOffset(PARTITION_OFFSET.getOffset() + 1L));
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider = 
mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+
+      mockedStaticProvider.when(() -> 
StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
+          .thenReturn(mockConsumerFactory);
+
+      // Commit a segment for partition group 0
+      String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
+      String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + 
NUM_DOCS).toString();
+      CommittingSegmentDescriptor committingSegmentDescriptor =
+          new CommittingSegmentDescriptor(committingSegment, endOffset, 
SEGMENT_SIZE_IN_BYTES);
+      committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+      segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+
+      // Verify instance states for committed segment and new consuming segment
+      Map<String, String> committedSegmentInstanceStateMap = 
instanceStatesMap.get(committingSegment);
+      assertNotNull(committedSegmentInstanceStateMap);
+      assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+          Collections.singleton(SegmentStateModel.ONLINE));
+
+      String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, 
CURRENT_TIME_MS).getSegmentName();
+      Map<String, String> consumingSegmentInstanceStateMap = 
instanceStatesMap.get(consumingSegment);
+      assertNotNull(consumingSegmentInstanceStateMap);
+      assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
+          Collections.singleton(SegmentStateModel.CONSUMING));
+
+      // Verify segment ZK metadata for committed segment and new consuming 
segment
+      SegmentZKMetadata committedSegmentZKMetadata = 
segmentManager._segmentZKMetadataMap.get(committingSegment);
+      assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+      assertEquals(committedSegmentZKMetadata.getStartOffset(), 
PARTITION_OFFSET.toString());
+      assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
+      assertEquals(committedSegmentZKMetadata.getCreationTime(), 
CURRENT_TIME_MS);
+      assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+      assertEquals(committedSegmentZKMetadata.getIndexVersion(), 
SEGMENT_VERSION.name());
+      assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+      assertEquals(committedSegmentZKMetadata.getSizeInBytes(), 
SEGMENT_SIZE_IN_BYTES);
+
+      SegmentZKMetadata consumingSegmentZKMetadata = 
segmentManager._segmentZKMetadataMap.get(consumingSegment);
+      assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
+      assertEquals(consumingSegmentZKMetadata.getStartOffset(), 
String.valueOf(LATEST_OFFSET));
+      assertEquals(committedSegmentZKMetadata.getCreationTime(), 
CURRENT_TIME_MS);
+    }
+  }
+
   /**
    * Test cases for the scenario where stream partitions increase, and the 
validation manager is attempting to create
    * segments for new partitions. This test assumes that all other factors 
remain the same (no error conditions or
@@ -1725,6 +1866,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
     }
 
+    void makeTableConfig(Map<String, String> streamConfigMap) {
+      _tableConfig =
+          new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
+              .setStreamConfigs(streamConfigMap).build();
+      _streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
+    }
+
     void makeConsumingInstancePartitions() {
       List<String> instances = new ArrayList<>(_numInstances);
       for (int i = 0; i < _numInstances; i++) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 0b13c05ba48..66361984bb2 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -187,6 +187,12 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long 
timestampMillis, long timeoutMillis) {
+    return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, 
timestampMillis),
+            Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
+  }
+
   public static class KafkaTopicMetadata implements TopicMetadata {
     private String _name;
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 62a181f605d..c0d005ae26d 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -199,6 +199,13 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
       return this;
     }
   }
+
+  @Override
+  public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long 
timestampMillis, long timeoutMillis) {
+      return new 
LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, 
timestampMillis),
+              Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index d907c0a9e4a..6fab25a51e8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -75,6 +75,10 @@ public class StreamConfig {
 
   private final double _topicConsumptionRateLimit;
 
+  private final boolean _enableOffsetAutoReset;
+  private final int _offsetAutoResetOffsetThreshold;
+  private final long _offsetAutoResetTimeSecThreshold;
+
   private final Map<String, String> _streamConfigMap = new HashMap<>();
 
   // Allow overriding it to use different offset criteria
@@ -199,6 +203,10 @@ public class StreamConfig {
     String rate = 
streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT);
     _topicConsumptionRateLimit = rate != null ? Double.parseDouble(rate) : 
CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED;
 
+    _enableOffsetAutoReset = 
Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET));
+    _offsetAutoResetOffsetThreshold = 
parseOffsetAutoResetOffsetThreshold(streamConfigMap);
+    _offsetAutoResetTimeSecThreshold = 
parseOffsetAutoResetTimeSecThreshold(streamConfigMap);
+
     _streamConfigMap.putAll(streamConfigMap);
   }
 
@@ -310,6 +318,34 @@ public class StreamConfig {
     }
   }
 
+  public static int parseOffsetAutoResetOffsetThreshold(Map<String, String> 
streamConfigMap) {
+    String key = StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY;
+    String offsetAutoResetOffsetThresholdStr = streamConfigMap.get(key);
+    if (offsetAutoResetOffsetThresholdStr != null) {
+      try {
+        return Integer.parseInt(offsetAutoResetOffsetThresholdStr);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Invalid config " + key + ": " + 
offsetAutoResetOffsetThresholdStr);
+      }
+    } else {
+      return -1; // Default value indicating disabled
+    }
+  }
+
+  public static long parseOffsetAutoResetTimeSecThreshold(Map<String, String> 
streamConfigMap) {
+    String key = 
StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY;
+    String offsetAutoResetTimeSecThresholdStr = streamConfigMap.get(key);
+    if (offsetAutoResetTimeSecThresholdStr != null) {
+      try {
+        return Long.parseLong(offsetAutoResetTimeSecThresholdStr);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Invalid config " + key + ": " + 
offsetAutoResetTimeSecThresholdStr);
+      }
+    } else {
+      return -1; // Default value indicating disabled
+    }
+  }
+
   public String getType() {
     return _type;
   }
@@ -383,6 +419,18 @@ public class StreamConfig {
         : Optional.of(_topicConsumptionRateLimit);
   }
 
+  public boolean isEnableOffsetAutoReset() {
+    return _enableOffsetAutoReset;
+  }
+
+  public int getOffsetAutoResetOffsetThreshold() {
+    return _offsetAutoResetOffsetThreshold;
+  }
+
+  public long getOffsetAutoResetTimeSecThreshold() {
+    return _offsetAutoResetTimeSecThreshold;
+  }
+
   public String getTableNameWithType() {
     return _tableNameWithType;
   }
@@ -402,7 +450,11 @@ public class StreamConfig {
         + _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" + 
_flushThresholdSegmentSizeBytes
         + ", _flushThresholdVarianceFraction=" + 
_flushThresholdVarianceFraction
         + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ", 
_groupId='" + _groupId + '\''
-        + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit + ", 
_streamConfigMap=" + _streamConfigMap
+        + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit
+        + ", _enableOffsetAutoReset=" + _enableOffsetAutoReset
+        + ", _offsetAutoResetOffsetThreshold" + _offsetAutoResetOffsetThreshold
+        + ", _offSetAutoResetTimeSecThreshold" + 
_offsetAutoResetTimeSecThreshold
+        + ", _streamConfigMap=" + _streamConfigMap
         + ", _offsetCriteria=" + _offsetCriteria + ", 
_serverUploadToDeepStore=" + _serverUploadToDeepStore + '}';
   }
 
@@ -427,7 +479,10 @@ public class StreamConfig {
         && Objects.equals(_consumerFactoryClassName, 
that._consumerFactoryClassName) && Objects.equals(_decoderClass,
         that._decoderClass) && Objects.equals(_decoderProperties, 
that._decoderProperties) && Objects.equals(_groupId,
         that._groupId) && Objects.equals(_streamConfigMap, 
that._streamConfigMap) && Objects.equals(_offsetCriteria,
-        that._offsetCriteria) && 
Objects.equals(_flushThresholdVarianceFraction, 
that._flushThresholdVarianceFraction);
+        that._offsetCriteria) && 
Objects.equals(_flushThresholdVarianceFraction, 
that._flushThresholdVarianceFraction)
+        && _enableOffsetAutoReset == that._enableOffsetAutoReset
+        && _offsetAutoResetOffsetThreshold == 
that._offsetAutoResetOffsetThreshold
+        && _offsetAutoResetTimeSecThreshold == 
that._offsetAutoResetTimeSecThreshold;
   }
 
   @Override
@@ -436,6 +491,7 @@ public class StreamConfig {
         _decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis, 
_idleTimeoutMillis, _flushThresholdRows,
         _flushThresholdSegmentRows, _flushThresholdTimeMillis, 
_flushThresholdSegmentSizeBytes,
         _flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit, 
_streamConfigMap, _offsetCriteria,
-        _serverUploadToDeepStore, _flushThresholdVarianceFraction);
+        _serverUploadToDeepStore, _flushThresholdVarianceFraction, 
_offsetAutoResetOffsetThreshold,
+        _enableOffsetAutoReset, _offsetAutoResetTimeSecThreshold);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
index 4286adbbb27..79db54dd288 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java
@@ -143,6 +143,25 @@ public class StreamConfigProperties {
   public static final String PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS =
       "realtime.segment.pauseless.download.timeoutSeconds";
 
+  /**
+   * Config used to enable offset auto reset during segment commit.
+   */
+  public static final String ENABLE_OFFSET_AUTO_RESET = 
"realtime.segment.offsetAutoReset.enable";
+
+  /**
+   * During segment commit, the new segment startOffset would skip to the 
latest offset if thisValue is set as positive
+   * and (latestStreamOffset - latestIngestedOffset > thisValue)
+   */
+  public static final String OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY =
+      "realtime.segment.offsetAutoReset.offsetThreshold";
+
+  /**
+   * During segment commit, the new segment startOffset would skip to the 
latest offset if thisValue is set as positive
+   * and (latestStreamOffset's timestamp - latestIngestedOffset's timestamp > 
thisValue)
+   */
+  public static final String OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY =
+      "realtime.segment.offsetAutoReset.timeThresholdSeconds";
+
   /**
    * Helper method to create a stream specific property
    */
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 66bf9768b5f..6cf724b058b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 
@@ -127,6 +128,11 @@ public interface StreamMetadataProvider extends Closeable {
     return result;
   }
 
+  @Nullable
+  default StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long 
timestampMillis, long timeoutMillis) {
+    return null;
+  }
+
   /**
    * Fetches the list of available topics/streams
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to