sherlock-lin created FLINK-39435:
------------------------------------

             Summary: [flink-connector-kafka] Switching Kafka topics via 
Savepoint restore causes dual-topic consumption AND new topic consumed from 
earliest instead of configured offset
                 Key: FLINK-39435
                 URL: https://issues.apache.org/jira/browse/FLINK-39435
             Project: Flink
          Issue Type: Bug
         Environment: - Flink version: 1.17.0
            Reporter: sherlock-lin


In `KafkaSourceEnumerator.handlePartitionSplitChanges()`, there is a 
long-standing `TODO` comment:

```java
// TODO: Handle removed partitions.
addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
assignPendingPartitionSplits(context.registeredReaders().keySet());
```
 

Two separate bugs are triggered simultaneously:

**Bug 1 — Dual-topic consumption (old topic is never stopped)**

The job consumes from **both `topic-A` and `topic-B` simultaneously**:

- `topic-A` splits come from the **Reader State** restored from Savepoint 
(`SourceOperator.open()` → `sourceReader.addSplits(restoredSplits)`). 
- `topic-B` splits come from the **Enumerator** discovering new partitions 
(`initializePartitionSplits()` → `assignSplits()`). 
- In `KafkaPartitionSplitReader.handleSplitsChanges()`, line:
```java
newPartitionAssignments.addAll(consumer.assignment()); // merges, does NOT 
replace
consumer.assign(newPartitionAssignments);
```
The old topic's partitions remain in the consumer assignment permanently. The 
Enumerator detects the removed partitions in `getPartitionChange()` but does 
nothing with them (`// TODO: Handle removed partitions.`).

**Bug 2 — New topic consumed from earliest instead of configured `latest()`**

Even though the job is configured with 
`.setStartingOffsets(OffsetsInitializer.latest())`, the new topic (`topic-B`) 
is consumed from the **earliest** available offset.

Log evidence:
```
INFO KafkaSourceEnumerator - Assigning splits to readers
{57=[[Partition: topic-B-0, StartingOffset: -2, StoppingOffset: 
-9223372036854775808], ...]}
```
Sentinel `-2` = `EARLIEST_OFFSET`, not `-1` (`LATEST_OFFSET`) as configured.

Root cause: `setStartingOffsets()` only takes effect for **truly new 
partitions** discovered by the Enumerator. During Savepoint restore, the Reader 
State injects old splits directly into the Reader via `SourceOperator.open() → 
sourceReader.addSplits(restoredSplits)`, bypassing `setStartingOffsets()` 
entirely. The default `startingOffsetsInitializer` in `KafkaSourceBuilder` 
(line 106) is `OffsetsInitializer.earliest()`, and for partitions not found in 
Reader State this default is used instead of the user-configured value.

**Expected behavior**

When `getPartitionChange()` detects `removedPartitions`, the Enumerator should 
send a signal to the affected Readers to **unassign** those partitions from 
their `KafkaConsumer`, so the job only consumes from the newly subscribed topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to