michael2893 opened a new pull request, #239:
URL: https://github.com/apache/flink-connector-kafka/pull/239

   ## Use configured starting offsets for newly discovered partitions
   
   ### Summary
   
   - Fix `KafkaSourceEnumerator` to use the user-configured 
`startingOffsetInitializer` for newly discovered partitions instead of 
hardcoded `OffsetsInitializer.earliest()`
   - Update `testDiscoverPartitionsPeriodically` to assert the new behavior
   
   ### Problem
   
   When a Kafka topic is removed and re-added (or a new topic appears via 
periodic partition discovery), the Flink Kafka connector ignores the user's 
configured starting offset and always starts reading from the **earliest** 
offset. This causes a full replay of the topic from offset 0.
   
   **Root cause:** `KafkaSourceEnumerator` maintains two offset initializers:
   - `startingOffsetInitializer` — the user's configured value (e.g. 
`OffsetsInitializer.latest()`)
   - `newDiscoveryOffsetsInitializer` — hardcoded to 
`OffsetsInitializer.earliest()` on [line 
197](flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L197)
   
   On first startup, `initialDiscoveryFinished = false`, so all partitions go 
through `startingOffsetInitializer`. After the first discovery completes, 
`initialDiscoveryFinished = true` is persisted in checkpoint state. Any 
partition discovered after that point — including topics that were removed and 
recreated — uses `newDiscoveryOffsetsInitializer` (`earliest()`).
   
   ### Fix
   
   Changed `KafkaSourceEnumerator.java:197` from:
   ```java
   this.newDiscoveryOffsetsInitializer = OffsetsInitializer.earliest();
   ```
   to:
   ```java
   this.newDiscoveryOffsetsInitializer = startingOffsetInitializer;
   ```
   
   This ensures newly discovered partitions respect the same offset strategy 
the user configured via `.setStartingOffsets()`.
   
   ### Files changed
   
   | File | Change |
   |------|--------|
   | `KafkaSourceEnumerator.java` | Use `startingOffsetInitializer` instead of 
`OffsetsInitializer.earliest()` for `newDiscoveryOffsetsInitializer` |
   | `KafkaSourceEnumeratorTest.java` | Update 
`testDiscoverPartitionsPeriodically` assertions to expect `latest()` offsets 
for dynamically discovered partitions |
   
   ### Test plan
   
   - [ ] `KafkaSourceEnumeratorTest.testDiscoverPartitionsPeriodically` passes 
with updated assertions
   - [ ] All existing `KafkaSourceEnumeratorTest` tests pass (parameterized 
offset initializer tests, snapshot state tests, add splits back tests)
   - [ ] Manual verification: deploy a pipeline with 
`.setStartingOffsets(OffsetsInitializer.latest())`, remove a topic, re-add it, 
confirm it does NOT replay from earliest
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to