[ 
https://issues.apache.org/jira/browse/FLINK-39728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jim Hughes updated FLINK-39728:
-------------------------------
    Description: 
When `pauseOrResumeSplits` is add to the mailbox thread, the set of splits it 
operates on reflects the mailbox thread's view *at start time*. 

By the time the SplitFetcher executes the action, a split may have already 
finished and been unassigned.

This can happen when the split-finished event has not yet propagated far enough 
to abort an in-flight alignment check on the `SourceOperator`. Calling 
`pause()` or `resume()` on a partition that is no longer in 
`consumer.assignment()` then throws `IllegalStateException`.

Generated-by: Claude Opus 4.6 (Anthropic)

  was:
KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
concurrently unassigned partitions

KafkaPartitionSplitReader.pauseOrResumeSplits() calls consumer.pause() and 
consumer.resume() with the full set of requested partitions without checking 
whether those partitions are still part of the consumer's current assignment.

Between the time the source reader decides which splits to pause/resume and the 
time pauseOrResumeSplits() executes, a partition can be unassigned by a 
concurrent fetch() call (which removes finished splits) or by 
removeEmptySplits(). When this happens, the Kafka consumer throws:

{code}
java.lang.IllegalStateException: No current assignment for partition 
<topic>-<partition>
{code}

This crashes the source reader even though the correct behavior is to silently 
skip partitions that are no longer assigned — pausing or resuming an unassigned 
partition is a no-op by definition.


> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
> concurrently unassigned partitions
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39728
>                 URL: https://issues.apache.org/jira/browse/FLINK-39728
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Jim Hughes
>            Assignee: Jim Hughes
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kafka-5.1.0
>
>
> When `pauseOrResumeSplits` is add to the mailbox thread, the set of splits it 
> operates on reflects the mailbox thread's view *at start time*. 
> By the time the SplitFetcher executes the action, a split may have already 
> finished and been unassigned.
> This can happen when the split-finished event has not yet propagated far 
> enough to abort an in-flight alignment check on the `SourceOperator`. Calling 
> `pause()` or `resume()` on a partition that is no longer in 
> `consumer.assignment()` then throws `IllegalStateException`.
> Generated-by: Claude Opus 4.6 (Anthropic)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to