[ 
https://issues.apache.org/jira/browse/KAFKA-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oliver Hsu updated KAFKA-12857:
-------------------------------
    Description: 
We are attempting to use a Kafka Connect Sink Connector with 
{{CooperativeStickyAssignor}} assignment strategy.  When we use 
{{CooperativeStickyAssignor}} offset commits sometimes fail with 

{{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} Ignoring 
invalid task provided offset mytopic-0/OffsetAndMetadata\{offset=16305575, 
leaderEpoch=null, metadata=''} – partition not assigned, assignment=[mytopic-0] 
(org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}

Note that the partition that the warning message says is not assigned matches 
the partition assignment.

*Config changes*

{{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}}

*Cooperative vs Eager Assignment Strategy background*
 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics]

With eager assignment:
{quote}Listener#onPartitionsAssigned: called on the full set of assigned 
partitions (may have overlap with the partitions passed to #onPartitionsRevoked
{quote}
With cooperative assignment:
{quote}Listener#onPartitionsAssigned: called on the subset of assigned 
partitions that were not previously owned before this rebalance. There should 
be no overlap with the revoked partitions (if any). This will always be called, 
even if there are no new partitions being assigned to a given member.
{quote}
This means with cooperative assignment, `onPartitionsAssigned` may be called 
with an empty collection.

However, the 
[WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680]
 class makes the assumption that `onPartitionsAssigned` is called with the full 
set of assigned partitions which was true for eager but not coooperative.
{code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid}
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) 
{
            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
            lastCommittedOffsets = new HashMap<>();
            currentOffsets = new HashMap<>();
            for (TopicPartition tp : partitions) {
                long pos = consumer.position(tp);
                lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                currentOffsets.put(tp, new OffsetAndMetadata(pos));
                log.debug("{} Assigned topic partition {} with offset {}", 
WorkerSinkTask.this, tp, pos);
            }
{code}
The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the 
offsets of the {{partitions}} in that {{HashMap}}.

In the logs we see
{{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} 
Partitions assigned [myTopic-0] 
(org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
{{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} 
Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
{{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} 
Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}

These logs show that the {{CooperativeStickyAssignor}} calls 
{{onPartitionsAssigned}} first with the partition assigned to it followed by 
additional calls with an empty {{partitions}} collection.

When the cooperative assignor calls {{HandleRebalance.onPartitionsAssigned}} 
first with the assigned partition followed by empty collections, the result 
will be {{lastCommittedOffsets}} initialized to an empty {{HashMap}}.

Inside 
[WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419],
 the current {{committableOffsets}} are based on the {{lastCommittedOffsets}}, 
which is an empty {{HashMap}}:
{code:java|title=WorkerSinkTask.java|borderStyle=solid}
private void commitOffsets(long now, boolean closing) {
...
        final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new 
HashMap<>(lastCommittedOffsets);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> 
taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
            final TopicPartition partition = taskProvidedOffsetEntry.getKey();
            final OffsetAndMetadata taskProvidedOffset = 
taskProvidedOffsetEntry.getValue();
            if (commitableOffsets.containsKey(partition)) {
                long taskOffset = taskProvidedOffset.offset();
                long currentOffset = currentOffsets.get(partition).offset();
                if (taskOffset <= currentOffset) {
                    commitableOffsets.put(partition, taskProvidedOffset);
                } else {
                    log.warn("{} Ignoring invalid task provided offset {}/{} -- 
not yet consumed, taskOffset={} currentOffset={}",
                            this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
                }
            } else {
                log.warn("{} Ignoring invalid task provided offset {}/{} -- 
partition not assigned, assignment={}",
                        this, partition, taskProvidedOffset, 
consumer.assignment());
            }
        }
{code}
{{if (commitableOffsets.containsKey(partition))}} is {{false}} because 
{{commitableOffsets} is an empty HashMap}} based on the empty 
{{lastCommittedOffsets HashMap}} . This causes {{{} Ignoring invalid task 
provided offset {}/{} – partition not assigned, assignment=}} to occur even 
though the task is assigned the partition.
 e.g.
 {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} Ignoring 
invalid task provided offset mytopic-0/OffsetAndMetadata\{offset=16305575, 
leaderEpoch=null, metadata=''} – partition not assigned, assignment=[mytopic-0] 
(org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}

*Recommended Changes*
 {{WorkerSinkTask.HandleRebalance.onPartitionsAssigned}} needs to handle the 
new cooperative assignment strategy which may call {{onPartitionsAssigned}} 
with a subset of assigned partitions or an empty collection if no new 
partitions being assigned to a given member when initializing 
{{lastCommitedOffsets}}.

 

  was:
We are attempting to use a Kafka Connect Sink Connector with 
{{CooperativeStickyAssignor}} assignment strategy.  When we use 
{{CooperativeStickyAssignor}} offset commits sometimes fail with 

{{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} Ignoring 
invalid task provided offset mytopic-0/OffsetAndMetadata\{offset=16305575, 
leaderEpoch=null, metadata=''} – partition not assigned, assignment=[mytopic-0] 
(org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}

Note that the partition that the warning message says is not assigned matches 
the partition assignment.

*Config changes*

{{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}}

*Cooperative vs Eager Assignment Strategy background*
 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics]

With eager assignment:
{quote}Listener#onPartitionsAssigned: called on the full set of assigned 
partitions (may have overlap with the partitions passed to #onPartitionsRevoked
{quote}
With cooperative assignment:
{quote}Listener#onPartitionsAssigned: called on the subset of assigned 
partitions that were not previously owned before this rebalance. There should 
be no overlap with the revoked partitions (if any). This will always be called, 
even if there are no new partitions being assigned to a given member.
{quote}
This means with cooperative assignment, `onPartitionsAssigned` may be called 
with an empty collection.

However, the 
[WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680]
 class makes the assumption that `onPartitionsAssigned` is called with the full 
set of assigned partitions which was true for eager but not coooperative.
{code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid}
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) 
{
            log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
            lastCommittedOffsets = new HashMap<>();
            currentOffsets = new HashMap<>();
            for (TopicPartition tp : partitions) {
                long pos = consumer.position(tp);
                lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                currentOffsets.put(tp, new OffsetAndMetadata(pos));
                log.debug("{} Assigned topic partition {} with offset {}", 
WorkerSinkTask.this, tp, pos);
            }
{code}
The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the 
offsets of the {{partitions}} in that {{HashMap}}.

{{In the logs we see}}
{{ [2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} 
Partitions assigned [myTopic-0] 
(org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
{{ [2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} 
Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
{{ [2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} 
Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}

These logs show that the {{CooperativeStickyAssignor}} calls 
{{onPartitionsAssigned}} first with the partition assigned to it followed by 
additional calls with an empty {{partitions}} collection.

When the cooperative assignor calls {{HandleRebalance.onPartitionsAssigned}} 
first with the assigned partition followed by empty collections, the result 
will be {{lastCommittedOffsets}} initialized to an empty {{HashMap}}.

Inside 
[WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419],
 the current {{committableOffsets}} are based on the {{lastCommittedOffsets}}, 
which is an empty {{HashMap}}:
{code:java|title=WorkerSinkTask.java|borderStyle=solid}
private void commitOffsets(long now, boolean closing) {
...
        final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new 
HashMap<>(lastCommittedOffsets);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> 
taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
            final TopicPartition partition = taskProvidedOffsetEntry.getKey();
            final OffsetAndMetadata taskProvidedOffset = 
taskProvidedOffsetEntry.getValue();
            if (commitableOffsets.containsKey(partition)) {
                long taskOffset = taskProvidedOffset.offset();
                long currentOffset = currentOffsets.get(partition).offset();
                if (taskOffset <= currentOffset) {
                    commitableOffsets.put(partition, taskProvidedOffset);
                } else {
                    log.warn("{} Ignoring invalid task provided offset {}/{} -- 
not yet consumed, taskOffset={} currentOffset={}",
                            this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
                }
            } else {
                log.warn("{} Ignoring invalid task provided offset {}/{} -- 
partition not assigned, assignment={}",
                        this, partition, taskProvidedOffset, 
consumer.assignment());
            }
        }
{code}
{{if (commitableOffsets.containsKey(partition))}} is {{false}} because 
{{commitableOffsets} is an empty HashMap}} based on the empty 
{{lastCommittedOffsets HashMap}} . This causes {{{} Ignoring invalid task 
provided offset {}/{} – partition not assigned, assignment=}} to occur even 
though the task is assigned the partition.
 e.g.
 {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} Ignoring 
invalid task provided offset mytopic-0/OffsetAndMetadata\{offset=16305575, 
leaderEpoch=null, metadata=''} – partition not assigned, assignment=[mytopic-0] 
(org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}

*Recommended Changes*
 {{WorkerSinkTask.HandleRebalance.onPartitionsAssigned}} needs to handle the 
new cooperative assignment strategy which may call {{onPartitionsAssigned}} 
with a subset of assigned partitions or an empty collection if no new 
partitions being assigned to a given member when initializing 
{{lastCommitedOffsets}}.

 


> Using Connect Sink with CooperativeStickyAssignor results in commit offsets 
> failure
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-12857
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12857
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.7.1
>         Environment: Linux
>            Reporter: Oliver Hsu
>            Priority: Major
>
> We are attempting to use a Kafka Connect Sink Connector with 
> {{CooperativeStickyAssignor}} assignment strategy.  When we use 
> {{CooperativeStickyAssignor}} offset commits sometimes fail with 
> {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} 
> Ignoring invalid task provided offset 
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} 
> – partition not assigned, assignment=[mytopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> Note that the partition that the warning message says is not assigned matches 
> the partition assignment.
> *Config changes*
> {{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}}
> *Cooperative vs Eager Assignment Strategy background*
>  
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics]
> With eager assignment:
> {quote}Listener#onPartitionsAssigned: called on the full set of assigned 
> partitions (may have overlap with the partitions passed to 
> #onPartitionsRevoked
> {quote}
> With cooperative assignment:
> {quote}Listener#onPartitionsAssigned: called on the subset of assigned 
> partitions that were not previously owned before this rebalance. There should 
> be no overlap with the revoked partitions (if any). This will always be 
> called, even if there are no new partitions being assigned to a given member.
> {quote}
> This means with cooperative assignment, `onPartitionsAssigned` may be called 
> with an empty collection.
> However, the 
> [WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680]
>  class makes the assumption that `onPartitionsAssigned` is called with the 
> full set of assigned partitions which was true for eager but not coooperative.
> {code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid}
>         public void onPartitionsAssigned(Collection<TopicPartition> 
> partitions) {
>             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
> partitions);
>             lastCommittedOffsets = new HashMap<>();
>             currentOffsets = new HashMap<>();
>             for (TopicPartition tp : partitions) {
>                 long pos = consumer.position(tp);
>                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
>                 currentOffsets.put(tp, new OffsetAndMetadata(pos));
>                 log.debug("{} Assigned topic partition {} with offset {}", 
> WorkerSinkTask.this, tp, pos);
>             }
> {code}
> The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the 
> offsets of the {{partitions}} in that {{HashMap}}.
> In the logs we see
> {{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} 
> Partitions assigned [myTopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> {{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} 
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> {{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} 
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> These logs show that the {{CooperativeStickyAssignor}} calls 
> {{onPartitionsAssigned}} first with the partition assigned to it followed by 
> additional calls with an empty {{partitions}} collection.
> When the cooperative assignor calls {{HandleRebalance.onPartitionsAssigned}} 
> first with the assigned partition followed by empty collections, the result 
> will be {{lastCommittedOffsets}} initialized to an empty {{HashMap}}.
> Inside 
> [WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419],
>  the current {{committableOffsets}} are based on the 
> {{lastCommittedOffsets}}, which is an empty {{HashMap}}:
> {code:java|title=WorkerSinkTask.java|borderStyle=solid}
> private void commitOffsets(long now, boolean closing) {
> ...
>         final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new 
> HashMap<>(lastCommittedOffsets);
>         for (Map.Entry<TopicPartition, OffsetAndMetadata> 
> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
>             final TopicPartition partition = taskProvidedOffsetEntry.getKey();
>             final OffsetAndMetadata taskProvidedOffset = 
> taskProvidedOffsetEntry.getValue();
>             if (commitableOffsets.containsKey(partition)) {
>                 long taskOffset = taskProvidedOffset.offset();
>                 long currentOffset = currentOffsets.get(partition).offset();
>                 if (taskOffset <= currentOffset) {
>                     commitableOffsets.put(partition, taskProvidedOffset);
>                 } else {
>                     log.warn("{} Ignoring invalid task provided offset {}/{} 
> -- not yet consumed, taskOffset={} currentOffset={}",
>                             this, partition, taskProvidedOffset, taskOffset, 
> currentOffset);
>                 }
>             } else {
>                 log.warn("{} Ignoring invalid task provided offset {}/{} -- 
> partition not assigned, assignment={}",
>                         this, partition, taskProvidedOffset, 
> consumer.assignment());
>             }
>         }
> {code}
> {{if (commitableOffsets.containsKey(partition))}} is {{false}} because 
> {{commitableOffsets} is an empty HashMap}} based on the empty 
> {{lastCommittedOffsets HashMap}} . This causes {{{} Ignoring invalid task 
> provided offset {}/{} – partition not assigned, assignment=}} to occur even 
> though the task is assigned the partition.
>  e.g.
>  {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} 
> Ignoring invalid task provided offset 
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} 
> – partition not assigned, assignment=[mytopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> *Recommended Changes*
>  {{WorkerSinkTask.HandleRebalance.onPartitionsAssigned}} needs to handle the 
> new cooperative assignment strategy which may call {{onPartitionsAssigned}} 
> with a subset of assigned partitions or an empty collection if no new 
> partitions being assigned to a given member when initializing 
> {{lastCommitedOffsets}}.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to