[ 
https://issues.apache.org/jira/browse/FLINK-39435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077201#comment-18077201
 ] 

Hongshun Wang commented on FLINK-39435:
---------------------------------------

>the Enumerator assigns splits for newly discovered partitions (i.e., 
>partitions not present in any restored Reader State), the `startingOffset` in 
>those splits should reflect the *{*}user-configured{*}* `setStartingOffsets()` 
>value (e.g., `-1` for `latest()`)

 

It's by design in FlIP-288 to avoid data loss 
(https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source_

> [flink-connector-kafka] Switching Kafka topics via Savepoint restore causes 
> dual-topic consumption AND new topic consumed from earliest instead of 
> configured offset
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39435
>                 URL: https://issues.apache.org/jira/browse/FLINK-39435
>             Project: Flink
>          Issue Type: Bug
>         Environment: - Flink version: 1.17.0
>            Reporter: sherlock-lin
>            Priority: Major
>              Labels: bug, flink-connector-kafka
>         Attachments: image-2026-04-14-17-15-57-350.png, 
> image-2026-04-14-17-16-33-680.png, image-2026-04-30-11-37-02-642.png
>
>
> In `KafkaSourceEnumerator.handlePartitionSplitChanges()`, there is a 
> long-standing `TODO` comment:
> !image-2026-04-14-17-16-33-680.png!
>  
> Two separate bugs are triggered simultaneously:
> *Bug 1 — Dual-topic consumption (old topic is never stopped)*
> The job consumes from both `topic-A` and `topic-B` simultaneously:
>  - `topic-A` splits come from the Reader State restored from Savepoint 
> (`SourceOperator.open()` → `sourceReader.addSplits(restoredSplits)`).
>  - `topic-B` splits come from the  Enumerator discovering new partitions 
> (`initializePartitionSplits()` → `assignSplits()`).
>  - In `KafkaPartitionSplitReader.handleSplitsChanges()`, line:
> ```java
> newPartitionAssignments.addAll(consumer.assignment()); // merges, does NOT 
> replace
> consumer.assign(newPartitionAssignments);
> ```
> The old topic's partitions remain in the consumer assignment permanently. The 
> Enumerator detects the removed partitions in `getPartitionChange()` but does 
> nothing with them (`// TODO: Handle removed partitions.`).
> *Bug 2 — New topic consumed from earliest instead of configured `latest()`*
> Even though the job is configured with 
> `.setStartingOffsets(OffsetsInitializer.latest())`, the new topic (`topic-B`) 
> is consumed from the  earliest  available offset.
> Log evidence:
> ```
> INFO KafkaSourceEnumerator - Assigning splits to readers
> {57=[[Partition: topic-B-0, StartingOffset: -2, StoppingOffset: 
> -9223372036854775808], ...]}
> ```
> Sentinel `-2` = `EARLIEST_OFFSET`, not `-1` (`LATEST_OFFSET`) as configured.
> Root cause: `setStartingOffsets()` only takes effect for truly new partitions 
> discovered by the Enumerator. During Savepoint restore, the Reader State 
> injects old splits directly into the Reader via `SourceOperator.open() → 
> sourceReader.addSplits(restoredSplits)`, bypassing `setStartingOffsets()` 
> entirely. The default `startingOffsetsInitializer` in `KafkaSourceBuilder` 
> (line 106) is `OffsetsInitializer.earliest()`, and for partitions not found 
> in Reader State this default is used instead of the user-configured value.
> *Expected behavior*
> **For Bug 1:** When `getPartitionChange()` detects `removedPartitions`, the 
> Enumerator should send a signal to the affected Readers to **unassign** those 
> partitions from their `KafkaConsumer`, so the job only consumes from the 
> newly subscribed topic.
> **For Bug 2:** When the Enumerator assigns splits for newly discovered 
> partitions (i.e., partitions not present in any restored Reader State), the 
> `startingOffset` in those splits should reflect the **user-configured** 
> `setStartingOffsets()` value (e.g., `-1` for `latest()`), rather than falling 
> back to the `KafkaSourceBuilder` default (`earliest()`). The user-configured 
> initializer should take precedence over the builder default at all times, 
> including during Savepoint restore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to