[ 
https://issues.apache.org/jira/browse/KAFKA-12487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17319412#comment-17319412
 ] 

Chris Egerton commented on KAFKA-12487:
---------------------------------------

Based on recent 
[KIP-726|https://lists.apache.org/thread.html/%3CCAFLS_9gOHobj-had=7SVBibJTxfzKCuf7WM+shTL_aTvSeyq=g...@mail.gmail.com%3E]
 discussion, we'll also want to implement the  
[Consumer::onPartitionsLost|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsLost-java.util.Collection-]
 method in order to avoid task failures due to offset commit failures if/when 
the consumer rebalance protocol is automatically downgraded from 
{{COOPERATIVE}} to {{EAGER}}.

> Sink connectors do not work with the cooperative consumer rebalance protocol
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-12487
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12487
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.0.0, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> The {{ConsumerRebalanceListener}} used by the framework to respond to 
> rebalance events in consumer groups for sink tasks is hard-coded with the 
> assumption that the consumer performs rebalances eagerly. In other words, it 
> assumes that whenever {{onPartitionsRevoked}} is called, all partitions have 
> been revoked from that consumer, and whenever {{onPartitionsAssigned}} is 
> called, the partitions passed in to that method comprise the complete set of 
> topic partitions assigned to that consumer.
> See the [WorkerSinkTask.HandleRebalance 
> class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730]
>  for the specifics.
>  
> One issue this can cause is silently ignoring to-be-committed offsets 
> provided by sink tasks, since the framework ignores offsets provided by tasks 
> in their {{preCommit}} method if it does not believe that the consumer for 
> that task is currently assigned the topic partition for that offset. See 
> these lines in the [WorkerSinkTask::commitOffsets 
> method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430]
>  for reference.
>  
> This may not be the only issue caused by configuring a sink connector's 
> consumer to use cooperative rebalancing. Rigorous unit and integration 
> testing should be added before claiming that the Connect framework supports 
> the use of cooperative consumers with sink connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to