[ 
https://issues.apache.org/jira/browse/KAFKA-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-12857.
-----------------------------------
      Assignee:     (was: dgd_contributor)
    Resolution: Duplicate

> Using Connect Sink with CooperativeStickyAssignor results in commit offsets 
> failure
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-12857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12857
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.7.1
>         Environment: Linux
>            Reporter: Oliver Hsu
>            Priority: Major
>
> We are attempting to use a Kafka Connect Sink Connector with 
> {{CooperativeStickyAssignor}} assignment strategy.  When we use 
> {{CooperativeStickyAssignor}} offset commits sometimes fail with 
> {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} 
> Ignoring invalid task provided offset 
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} 
> – partition not assigned, assignment=[mytopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> Note that the invalid partition in the warning message matches the partition 
> assignment.
> *Config changes*
> {{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}}
> *Cooperative vs Eager Assignment Strategy background*
>  
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics]
> With eager assignment:
> {quote}Listener#onPartitionsAssigned: called on the full set of assigned 
> partitions (may have overlap with the partitions passed to 
> #onPartitionsRevoked
> {quote}
> With cooperative assignment:
> {quote}Listener#onPartitionsAssigned: called on the subset of assigned 
> partitions that were not previously owned before this rebalance. There should 
> be no overlap with the revoked partitions (if any). This will always be 
> called, even if there are no new partitions being assigned to a given member.
> {quote}
> This means with cooperative assignment, `onPartitionsAssigned` may be called 
> with a partial assignment or an empty collection.
> However, the 
> [WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680]
>  class makes the assumption that `onPartitionsAssigned` is called with the 
> full set of assigned partitions which is true for eager but not coooperative.
> {code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid}
>         public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions) {
>             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
> partitions);
>             lastCommittedOffsets = new HashMap<>();
>             currentOffsets = new HashMap<>();
>             for (TopicPartition tp : partitions) {
>                 long pos = consumer.position(tp);
>                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
>                 currentOffsets.put(tp, new OffsetAndMetadata(pos));
>                 log.debug("{} Assigned topic partition {} with offset {}", 
> WorkerSinkTask.this, tp, pos);
>             }
> {code}
> The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the 
> offsets of the {{partitions}} in that {{HashMap}}.
> In the logs we see
>  {{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} 
> Partitions assigned [myTopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
>  {{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} 
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
>  {{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} 
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> These logs show that the {{CooperativeStickyAssignor}} calls 
> {{onPartitionsAssigned}} first with the partition assigned to it followed by 
> additional calls with an empty {{partitions}} collection.
> When {{HandleRebalance.onPartitionsAssigned}} is called first with the 
> assigned partition followed by empty collections, the result will be 
> {{lastCommittedOffsets}} initialized to an empty {{HashMap}}.
> Inside 
> [WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419],
>  the current {{committableOffsets}} are based on the 
> {{lastCommittedOffsets}}, which is an empty {{HashMap}}:
> {code:java|title=WorkerSinkTask.java|borderStyle=solid}
> private void commitOffsets(long now, boolean closing) {
> ...
>         final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new 
> HashMap<>(lastCommittedOffsets);
>         for (Map.Entry<TopicPartition, OffsetAndMetadata> 
> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
>             final TopicPartition partition = taskProvidedOffsetEntry.getKey();
>             final OffsetAndMetadata taskProvidedOffset = 
> taskProvidedOffsetEntry.getValue();
>             if (commitableOffsets.containsKey(partition)) {
>                 long taskOffset = taskProvidedOffset.offset();
>                 long currentOffset = currentOffsets.get(partition).offset();
>                 if (taskOffset <= currentOffset) {
>                     commitableOffsets.put(partition, taskProvidedOffset);
>                 } else {
>                     log.warn("{} Ignoring invalid task provided offset {}/{} 
> -- not yet consumed, taskOffset={} currentOffset={}",
>                             this, partition, taskProvidedOffset, taskOffset, 
> currentOffset);
>                 }
>             } else {
>                 log.warn("{} Ignoring invalid task provided offset {}/{} -- 
> partition not assigned, assignment={}",
>                         this, partition, taskProvidedOffset, 
> consumer.assignment());
>             }
>         }
> {code}
> {{if (commitableOffsets.containsKey(partition))}} is {{false}} because 
> {{commitableOffsets} is an empty HashMap}} based on the empty 
> {{lastCommittedOffsets HashMap}} . This causes {{{} Ignoring invalid task 
> provided offset {}/{} – partition not assigned, assignment=}} to occur even 
> though the task is assigned the partition.
>  e.g.
>  {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} 
> Ignoring invalid task provided offset 
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} 
> – partition not assigned, assignment=[mytopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> *Recommended Changes*
>  {{WorkerSinkTask.HandleRebalance.onPartitionsAssigned}} needs to handle the 
> new cooperative assignment strategy which may call {{onPartitionsAssigned}} 
> with a subset of assigned partitions or an empty collection if no new 
> partitions being assigned to a given member when initializing 
> {{lastCommitedOffsets}}.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to