[
https://issues.apache.org/jira/browse/FLINK-39435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077201#comment-18077201
]
Hongshun Wang commented on FLINK-39435:
---------------------------------------
>the Enumerator assigns splits for newly discovered partitions (i.e.,
>partitions not present in any restored Reader State), the `startingOffset` in
>those splits should reflect the *{*}user-configured{*}* `setStartingOffsets()`
>value (e.g., `-1` for `latest()`)
It's by design in FlIP-288 to avoid data loss
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source_
> [flink-connector-kafka] Switching Kafka topics via Savepoint restore causes
> dual-topic consumption AND new topic consumed from earliest instead of
> configured offset
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39435
> URL: https://issues.apache.org/jira/browse/FLINK-39435
> Project: Flink
> Issue Type: Bug
> Environment: - Flink version: 1.17.0
> Reporter: sherlock-lin
> Priority: Major
> Labels: bug, flink-connector-kafka
> Attachments: image-2026-04-14-17-15-57-350.png,
> image-2026-04-14-17-16-33-680.png, image-2026-04-30-11-37-02-642.png
>
>
> In `KafkaSourceEnumerator.handlePartitionSplitChanges()`, there is a
> long-standing `TODO` comment:
> !image-2026-04-14-17-16-33-680.png!
>
> Two separate bugs are triggered simultaneously:
> *Bug 1 — Dual-topic consumption (old topic is never stopped)*
> The job consumes from both `topic-A` and `topic-B` simultaneously:
> - `topic-A` splits come from the Reader State restored from Savepoint
> (`SourceOperator.open()` → `sourceReader.addSplits(restoredSplits)`).
> - `topic-B` splits come from the Enumerator discovering new partitions
> (`initializePartitionSplits()` → `assignSplits()`).
> - In `KafkaPartitionSplitReader.handleSplitsChanges()`, line:
> ```java
> newPartitionAssignments.addAll(consumer.assignment()); // merges, does NOT
> replace
> consumer.assign(newPartitionAssignments);
> ```
> The old topic's partitions remain in the consumer assignment permanently. The
> Enumerator detects the removed partitions in `getPartitionChange()` but does
> nothing with them (`// TODO: Handle removed partitions.`).
> *Bug 2 — New topic consumed from earliest instead of configured `latest()`*
> Even though the job is configured with
> `.setStartingOffsets(OffsetsInitializer.latest())`, the new topic (`topic-B`)
> is consumed from the earliest available offset.
> Log evidence:
> ```
> INFO KafkaSourceEnumerator - Assigning splits to readers
> {57=[[Partition: topic-B-0, StartingOffset: -2, StoppingOffset:
> -9223372036854775808], ...]}
> ```
> Sentinel `-2` = `EARLIEST_OFFSET`, not `-1` (`LATEST_OFFSET`) as configured.
> Root cause: `setStartingOffsets()` only takes effect for truly new partitions
> discovered by the Enumerator. During Savepoint restore, the Reader State
> injects old splits directly into the Reader via `SourceOperator.open() →
> sourceReader.addSplits(restoredSplits)`, bypassing `setStartingOffsets()`
> entirely. The default `startingOffsetsInitializer` in `KafkaSourceBuilder`
> (line 106) is `OffsetsInitializer.earliest()`, and for partitions not found
> in Reader State this default is used instead of the user-configured value.
> *Expected behavior*
> **For Bug 1:** When `getPartitionChange()` detects `removedPartitions`, the
> Enumerator should send a signal to the affected Readers to **unassign** those
> partitions from their `KafkaConsumer`, so the job only consumes from the
> newly subscribed topic.
> **For Bug 2:** When the Enumerator assigns splits for newly discovered
> partitions (i.e., partitions not present in any restored Reader State), the
> `startingOffset` in those splits should reflect the **user-configured**
> `setStartingOffsets()` value (e.g., `-1` for `latest()`), rather than falling
> back to the `KafkaSourceBuilder` default (`earliest()`). The user-configured
> initializer should take precedence over the builder default at all times,
> including during Savepoint restore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)