[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14220 ]


Pritam Kumar deleted comment on KAFKA-14220:
--

was (Author: JIRAUSER295638):
The following is the patch made and verified:
*https://github.com/apache/kafka/pull/12622*

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14220 ]


Pritam Kumar deleted comment on KAFKA-14220:
--

was (Author: JIRAUSER295638):
https://github.com/apache/kafka/pull/12622

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)