sherlock-lin created FLINK-39435:
------------------------------------
Summary: [flink-connector-kafka] Switching Kafka topics via
Savepoint restore causes dual-topic consumption AND new topic consumed from
earliest instead of configured offset
Key: FLINK-39435
URL: https://issues.apache.org/jira/browse/FLINK-39435
Project: Flink
Issue Type: Bug
Environment: - Flink version: 1.17.0
Reporter: sherlock-lin
In `KafkaSourceEnumerator.handlePartitionSplitChanges()`, there is a
long-standing `TODO` comment:
```java
// TODO: Handle removed partitions.
addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
assignPendingPartitionSplits(context.registeredReaders().keySet());
```
Two separate bugs are triggered simultaneously:
**Bug 1 — Dual-topic consumption (old topic is never stopped)**
The job consumes from **both `topic-A` and `topic-B` simultaneously**:
- `topic-A` splits come from the **Reader State** restored from Savepoint
(`SourceOperator.open()` → `sourceReader.addSplits(restoredSplits)`).
- `topic-B` splits come from the **Enumerator** discovering new partitions
(`initializePartitionSplits()` → `assignSplits()`).
- In `KafkaPartitionSplitReader.handleSplitsChanges()`, line:
```java
newPartitionAssignments.addAll(consumer.assignment()); // merges, does NOT
replace
consumer.assign(newPartitionAssignments);
```
The old topic's partitions remain in the consumer assignment permanently. The
Enumerator detects the removed partitions in `getPartitionChange()` but does
nothing with them (`// TODO: Handle removed partitions.`).
**Bug 2 — New topic consumed from earliest instead of configured `latest()`**
Even though the job is configured with
`.setStartingOffsets(OffsetsInitializer.latest())`, the new topic (`topic-B`)
is consumed from the **earliest** available offset.
Log evidence:
```
INFO KafkaSourceEnumerator - Assigning splits to readers
{57=[[Partition: topic-B-0, StartingOffset: -2, StoppingOffset:
-9223372036854775808], ...]}
```
Sentinel `-2` = `EARLIEST_OFFSET`, not `-1` (`LATEST_OFFSET`) as configured.
Root cause: `setStartingOffsets()` only takes effect for **truly new
partitions** discovered by the Enumerator. During Savepoint restore, the Reader
State injects old splits directly into the Reader via `SourceOperator.open() →
sourceReader.addSplits(restoredSplits)`, bypassing `setStartingOffsets()`
entirely. The default `startingOffsetsInitializer` in `KafkaSourceBuilder`
(line 106) is `OffsetsInitializer.earliest()`, and for partitions not found in
Reader State this default is used instead of the user-configured value.
**Expected behavior**
When `getPartitionChange()` detects `removedPartitions`, the Enumerator should
send a signal to the affected Readers to **unassign** those partitions from
their `KafkaConsumer`, so the job only consumes from the newly subscribed topic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)