michael2893 opened a new pull request, #239: URL: https://github.com/apache/flink-connector-kafka/pull/239
## Use configured starting offsets for newly discovered partitions ### Summary - Fix `KafkaSourceEnumerator` to use the user-configured `startingOffsetInitializer` for newly discovered partitions instead of hardcoded `OffsetsInitializer.earliest()` - Update `testDiscoverPartitionsPeriodically` to assert the new behavior ### Problem When a Kafka topic is removed and re-added (or a new topic appears via periodic partition discovery), the Flink Kafka connector ignores the user's configured starting offset and always starts reading from the **earliest** offset. This causes a full replay of the topic from offset 0. **Root cause:** `KafkaSourceEnumerator` maintains two offset initializers: - `startingOffsetInitializer` — the user's configured value (e.g. `OffsetsInitializer.latest()`) - `newDiscoveryOffsetsInitializer` — hardcoded to `OffsetsInitializer.earliest()` on [line 197](flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L197) On first startup, `initialDiscoveryFinished = false`, so all partitions go through `startingOffsetInitializer`. After the first discovery completes, `initialDiscoveryFinished = true` is persisted in checkpoint state. Any partition discovered after that point — including topics that were removed and recreated — uses `newDiscoveryOffsetsInitializer` (`earliest()`). ### Fix Changed `KafkaSourceEnumerator.java:197` from: ```java this.newDiscoveryOffsetsInitializer = OffsetsInitializer.earliest(); ``` to: ```java this.newDiscoveryOffsetsInitializer = startingOffsetInitializer; ``` This ensures newly discovered partitions respect the same offset strategy the user configured via `.setStartingOffsets()`. ### Files changed | File | Change | |------|--------| | `KafkaSourceEnumerator.java` | Use `startingOffsetInitializer` instead of `OffsetsInitializer.earliest()` for `newDiscoveryOffsetsInitializer` | | `KafkaSourceEnumeratorTest.java` | Update `testDiscoverPartitionsPeriodically` assertions to expect `latest()` offsets for dynamically discovered partitions | ### Test plan - [ ] `KafkaSourceEnumeratorTest.testDiscoverPartitionsPeriodically` passes with updated assertions - [ ] All existing `KafkaSourceEnumeratorTest` tests pass (parameterized offset initializer tests, snapshot state tests, add splits back tests) - [ ] Manual verification: deploy a pipeline with `.setStartingOffsets(OffsetsInitializer.latest())`, remove a topic, re-add it, confirm it does NOT replay from earliest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
