[ https://issues.apache.org/jira/browse/KAFKA-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dc-heros reassigned KAFKA-12857: -------------------------------- Assignee: dc-heros > Using Connect Sink with CooperativeStickyAssignor results in commit offsets > failure > ----------------------------------------------------------------------------------- > > Key: KAFKA-12857 > URL: https://issues.apache.org/jira/browse/KAFKA-12857 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.7.1 > Environment: Linux > Reporter: Oliver Hsu > Assignee: dc-heros > Priority: Major > > We are attempting to use a Kafka Connect Sink Connector with > {{CooperativeStickyAssignor}} assignment strategy. When we use > {{CooperativeStickyAssignor}} offset commits sometimes fail with > {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} > Ignoring invalid task provided offset > mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} > – partition not assigned, assignment=[mytopic-0] > (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}} > Note that the invalid partition in the warning message matches the partition > assignment. > *Config changes* > {{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}} > *Cooperative vs Eager Assignment Strategy background* > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics] > With eager assignment: > {quote}Listener#onPartitionsAssigned: called on the full set of assigned > partitions (may have overlap with the partitions passed to > #onPartitionsRevoked > {quote} > With cooperative assignment: > {quote}Listener#onPartitionsAssigned: called on the subset of assigned > partitions that were not previously owned before this rebalance. There should > be no overlap with the revoked partitions (if any). This will always be > called, even if there are no new partitions being assigned to a given member. > {quote} > This means with cooperative assignment, `onPartitionsAssigned` may be called > with a partial assignment or an empty collection. > However, the > [WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680] > class makes the assumption that `onPartitionsAssigned` is called with the > full set of assigned partitions which is true for eager but not coooperative. > {code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid} > public void onPartitionsAssigned(Collection<TopicPartition> > partitions) { > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > lastCommittedOffsets = new HashMap<>(); > currentOffsets = new HashMap<>(); > for (TopicPartition tp : partitions) { > long pos = consumer.position(tp); > lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); > currentOffsets.put(tp, new OffsetAndMetadata(pos)); > log.debug("{} Assigned topic partition {} with offset {}", > WorkerSinkTask.this, tp, pos); > } > {code} > The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the > offsets of the {{partitions}} in that {{HashMap}}. > In the logs we see > {{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} > Partitions assigned [myTopic-0] > (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}} > {{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} > Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}} > {{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} > Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}} > These logs show that the {{CooperativeStickyAssignor}} calls > {{onPartitionsAssigned}} first with the partition assigned to it followed by > additional calls with an empty {{partitions}} collection. > When {{HandleRebalance.onPartitionsAssigned}} is called first with the > assigned partition followed by empty collections, the result will be > {{lastCommittedOffsets}} initialized to an empty {{HashMap}}. > Inside > [WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419], > the current {{committableOffsets}} are based on the > {{lastCommittedOffsets}}, which is an empty {{HashMap}}: > {code:java|title=WorkerSinkTask.java|borderStyle=solid} > private void commitOffsets(long now, boolean closing) { > ... > final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new > HashMap<>(lastCommittedOffsets); > for (Map.Entry<TopicPartition, OffsetAndMetadata> > taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) { > final TopicPartition partition = taskProvidedOffsetEntry.getKey(); > final OffsetAndMetadata taskProvidedOffset = > taskProvidedOffsetEntry.getValue(); > if (commitableOffsets.containsKey(partition)) { > long taskOffset = taskProvidedOffset.offset(); > long currentOffset = currentOffsets.get(partition).offset(); > if (taskOffset <= currentOffset) { > commitableOffsets.put(partition, taskProvidedOffset); > } else { > log.warn("{} Ignoring invalid task provided offset {}/{} > -- not yet consumed, taskOffset={} currentOffset={}", > this, partition, taskProvidedOffset, taskOffset, > currentOffset); > } > } else { > log.warn("{} Ignoring invalid task provided offset {}/{} -- > partition not assigned, assignment={}", > this, partition, taskProvidedOffset, > consumer.assignment()); > } > } > {code} > {{if (commitableOffsets.containsKey(partition))}} is {{false}} because > {{commitableOffsets} is an empty HashMap}} based on the empty > {{lastCommittedOffsets HashMap}} . This causes {{{} Ignoring invalid task > provided offset {}/{} – partition not assigned, assignment=}} to occur even > though the task is assigned the partition. > e.g. > {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} > Ignoring invalid task provided offset > mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} > – partition not assigned, assignment=[mytopic-0] > (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}} > *Recommended Changes* > {{WorkerSinkTask.HandleRebalance.onPartitionsAssigned}} needs to handle the > new cooperative assignment strategy which may call {{onPartitionsAssigned}} > with a subset of assigned partitions or an empty collection if no new > partitions being assigned to a given member when initializing > {{lastCommitedOffsets}}. > -- This message was sent by Atlassian Jira (v8.3.4#803005)