Toshiki Murakami created KAFKA-19685:
----------------------------------------
Summary: Revoked partitions are included in currentOffsets passed
to preCommit on task stop
Key: KAFKA-19685
URL: https://issues.apache.org/jira/browse/KAFKA-19685
Project: Kafka
Issue Type: Bug
Components: connect
Affects Versions: 3.9.0
Reporter: Toshiki Murakami
When a task stops,
[{{WorkerSinkTask#closeAllPartitions()}}|https://github.com/apache/kafka/blob/84caaa6e9da06435411510a81fa321d4f99c351f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L664]
runs and the [task’s {{preCommit}} is
invoked|https://github.com/apache/kafka/blob/3c7f99ad31397a6a7a4975d058891f236f37d02d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L444].
At that time, the {{currentOffsets}} argument may include {*}revoked
partitions{*}.
This appears to be caused by:
* {{WorkerSinkTask}} does not remove revoked partitions from {{currentOffsets}}
* {{WorkerSinkTask#closeAllPartitions()}} passes its {{currentOffsets}} to
{{SinkTask#preCommit(...)}} _as-is_ (i.e., without filtering).
During normal iterations, {{SinkTask#preCommit(...)}} receives
{{{}KafkaConsumer#assignment(){}}}, so revoked partitions are *not* included.
Having revoked partitions included *only* at stop is confusing behavior. If
this behavior is specified, Could we add a brief note to the
{{SinkTask#preCommit(...)}} Javadoc to clarify this behavior?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)