[jira] [Commented] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs

2024-06-26 Thread Pritam Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860164#comment-17860164
 ] 

Pritam Kumar commented on KAFKA-14021:
--

That we are doing on the start method, we are reading from the global topic, 
but start will be called only once and in between the consumer will rely while 
polling on the __consumer_offsets topics for poll the batch right?

> MirrorMaker 2 should implement KIP-618 APIs
> ---
>
> Key: KAFKA-14021
> URL: https://issues.apache.org/jira/browse/KAFKA-14021
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, mirrormaker
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> The {{MirrorSourceConnector}} class should implement the new APIs added by 
> KIP-618.
> This includes the 
> [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54]
>  method and, potentially, the 
> [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73]
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs

2024-06-26 Thread Pritam Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860162#comment-17860162
 ] 

Pritam Kumar edited comment on KAFKA-14021 at 6/26/24 1:26 PM:
---

Yes [~ChrisEgerton] the thing is the consumer will poll the records and if 
TransactionBoundary is set to poll the transaction will start and the producer 
will producer the records to the destination cluster and commit the offsets to 
the global or user configured offset topic and transaction will end. Now 
task.commit is called and suppose this commit is failed. After this if the task 
is started the consumer will again poll from the last offset only and the 
records will be duplicated in the destination cluster. 

I hope I understood this correctly, but what I meant to say without "offset 
syncs" can we guarantee exactly once delivery of records.

Sorry If I have mis-understood something here.


was (Author: JIRAUSER301755):
Yes [~ChrisEgerton] the thing is the consumer will poll the records and if 
TransactionBoundary is set to poll the transaction will start and the producer 
will producer the records to the destination cluster and commit the offsets to 
the global or user configured offset topic and transaction will end. Now 
task.commit is called and suppose this commit is failed. After this if the task 
is started the consumer will again poll from the last offset only and the 
records will be duplicated in the destination cluster. 

I hope I understood this correctly, but what I meant to say without "offset 
syncs" can we guarantee exactly once delivery of records.

> MirrorMaker 2 should implement KIP-618 APIs
> ---
>
> Key: KAFKA-14021
> URL: https://issues.apache.org/jira/browse/KAFKA-14021
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, mirrormaker
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> The {{MirrorSourceConnector}} class should implement the new APIs added by 
> KIP-618.
> This includes the 
> [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54]
>  method and, potentially, the 
> [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73]
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs

2024-06-26 Thread Pritam Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860162#comment-17860162
 ] 

Pritam Kumar commented on KAFKA-14021:
--

Yes [~ChrisEgerton] the thing is the consumer will poll the records and if 
TransactionBoundary is set to poll the transaction will start and the producer 
will producer the records to the destination cluster and commit the offsets to 
the global or user configured offset topic and transaction will end. Now 
task.commit is called and suppose this commit is failed. After this if the task 
is started the consumer will again poll from the last offset only and the 
records will be duplicated in the destination cluster. 

I hope I understood this correctly, but what I meant to say without "offset 
syncs" can we guarantee exactly once delivery of records.

> MirrorMaker 2 should implement KIP-618 APIs
> ---
>
> Key: KAFKA-14021
> URL: https://issues.apache.org/jira/browse/KAFKA-14021
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, mirrormaker
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> The {{MirrorSourceConnector}} class should implement the new APIs added by 
> KIP-618.
> This includes the 
> [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54]
>  method and, potentially, the 
> [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73]
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs

2024-06-26 Thread Pritam Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860154#comment-17860154
 ] 

Pritam Kumar commented on KAFKA-14021:
--

[~ChrisEgerton] Although we have implemented the 

@Override
public ExactlyOnceSupport exactlyOnceSupport(Map props) {
return consumerUsesReadCommitted(props)
? ExactlyOnceSupport.SUPPORTED
: ExactlyOnceSupport.UNSUPPORTED;
}

In the source connector, but one thing we should make sure that before polling 
the consumer in the source connector should sync the offsets between the 
offsets committed in the offset.storage.topic and __offsets topic which a plain 
consumer uses. The sync is happening in the task.commit() call which is called 
outside the transaction.



private void commitTransaction() {
log.debug("{} Committing offsets", this);

long started = time.milliseconds();

AtomicReference flushError = new AtomicReference<>();
boolean shouldFlush = false;
try {
// Begin the flush without waiting, as there should not be any concurrent 
flushes.
// This is because commitTransaction is always called on the same thread, and 
should always block until
// the flush is complete, or cancel the flush if an error occurs.
shouldFlush = offsetWriter.beginFlush();
} catch (Throwable e) {
flushError.compareAndSet(null, e);
}
if (flushError.get() == null && !transactionOpen && !shouldFlush) {
// There is no contents on the framework side to commit, so skip the offset 
flush and producer commit
long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);

commitSourceTask();
return;
}

// We might have just aborted a transaction, in which case we'll have to begin 
a new one
// in order to commit offsets
maybeBeginTransaction();

if (shouldFlush) {
// Now we can actually write the offsets to the internal topic.
// No need to track the flush future here since it's guaranteed to complete by 
the time
// Producer::commitTransaction completes
// We do have to track failures for that callback though, since they may 
originate from outside
// the producer (i.e., the offset writer or the backing offset store), and 
would not cause
// Producer::commitTransaction to fail
offsetWriter.doFlush((error, result) -> {
if (error != null) {
log.error("{} Failed to flush offsets to storage: ", 
ExactlyOnceWorkerSourceTask.this, error);
flushError.compareAndSet(null, error);
} else {
log.trace("{} Finished flushing offsets to storage", 
ExactlyOnceWorkerSourceTask.this);
}
});
}

// Only commit the transaction if we were able to serialize the offsets.
// Otherwise, we may commit source records without committing their offsets
Throwable error = flushError.get();
if (error == null) {
try {
// Commit the transaction
// Blocks until all outstanding records have been sent and ack'd
+_*producer.commitTransaction();*_+
} catch (Throwable t) {
log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
flushError.compareAndSet(null, t);
}
transactionOpen = false;
}

error = flushError.get();
if (error != null) {
recordCommitFailure(time.milliseconds() - started, null);
offsetWriter.cancelFlush();
throw maybeWrapProducerSendException(
"Failed to flush offsets and/or records for task " + id,
error
);
}

transactionMetrics.commitTransaction();

long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);

// Synchronize in order to guarantee that writes on other threads are picked up 
by this one
synchronized (committableRecords) {
committableRecords.forEach(this::commitTaskRecord);
committableRecords.clear();
}
+_*commitSourceTask();*_+
}

Which could still result in the duplicate records.

Can you please help me out here.

> MirrorMaker 2 should implement KIP-618 APIs
> ---
>
> Key: KAFKA-14021
> URL: https://issues.apache.org/jira/browse/KAFKA-14021
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, mirrormaker
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.5.0
>
>
> The {{MirrorSourceConnector}} class should implement the new APIs added by 
> KIP-618.
> This includes the 
> [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54]
>  method and, potentially, the 
> [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73]
>  method.



--
This message was sent by 

[jira] [Commented] (KAFKA-13679) Superfluous node disconnected log messages

2024-01-07 Thread Pritam Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804157#comment-17804157
 ] 

Pritam Kumar commented on KAFKA-13679:
--

[~ChrisEgerton] any thoughts on the above issue, as it is flooding the logs. 
Thanks.

> Superfluous node disconnected log messages
> --
>
> Key: KAFKA-13679
> URL: https://issues.apache.org/jira/browse/KAFKA-13679
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: Philip Bourke
>Priority: Minor
>
> In Kafka 3.1 the "{_}Node x disconnected{_}" log message in the 
> {{NetworkClient.java}} class was changed from DEBUG to INFO - 
> [https://github.com/apache/kafka/commit/79d97bd29d059e8ba8ee7726b49d76e03e281059#diff-dcc1af531d191de8da1e23ad6d878a3efc463ba4670dbcf2896295a9dacd1c18R935]
> Now my application logs are full of node disconnected messages and it would 
> indicate that there may be a connectivity problem. However I can see that the 
> logs are getting written every 5 minutes exactly, and it's the AdminClient 
> that is writing the logs.
> {code:bash}
> 2022-02-16 14:45:39,277 [d-60105f051cdb-admin] INFO   
> o.apache.kafka.clients.NetworkClient - [AdminClient 
> clientId=desktop-session-internal-user-streamer-v1-9888ff1d-446e-40cd-88dd-60105f051cdb-admin]
>  Node 1 disconnected.
> {code}
> My guess is that it may be the 
> [connections.max.idle.ms|https://kafka.apache.org/documentation/#adminclientconfigs_connections.max.idle.ms]
>  config setting, and there is in fact no issue with connectivity to the 
> brokers?
> I'm raising this ticket here because the logs are full of these repetitive 
> messages indicating an issue and setting off alarm bells, and also because I 
> did not get a response on the confluent forum or in any slack channels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-10-25 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-15680:
-
Fix Version/s: 3.6.1

> Partition-Count is not getting updated Correctly in the Incremental 
> Co-operative Rebalancing(ICR) Mode of Rebalancing
> -
>
> Key: KAFKA-15680
> URL: https://issues.apache.org/jira/browse/KAFKA-15680
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Assignee: Pritam Kumar
>Priority: Minor
> Fix For: 3.6.1
>
>
> * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, 
> say Worker 3 joins, a new global assignment is computed by the leader, say 
> Worker1, that results in the revocation of some tasks from each existing 
> worker i.e Worker1 and Worker2.
>  * Once the new member join is completed, 
> *ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
> computes all the new partitions assigned and the partitions which are revoked 
> and updates the subscription Object.
>  * If it was the case of revocation which we check by checking the 
> “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
> which internally calls “updatePartitionCount()” which fetches partition from 
> the *assignment* object which is yet not updated by the new assignment.
>  * It is only just before calling the “{*}invokePartitionsAssigned{*}()” 
> method that we update the *assignment* by invoking the following → 
> *subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-10-24 Thread Pritam Kumar (Jira)
Pritam Kumar created KAFKA-15680:


 Summary: Partition-Count is not getting updated Correctly in the 
Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing
 Key: KAFKA-15680
 URL: https://issues.apache.org/jira/browse/KAFKA-15680
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.0.1
Reporter: Pritam Kumar
Assignee: Pritam Kumar


* In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, say 
Worker 3 joins, a new global assignment is computed by the leader, say Worker1, 
that results in the revocation of some tasks from each existing worker i.e 
Worker1 and Worker2.
 * Once the new member join is completed, 
*ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
computes all the new partitions assigned and the partitions which are revoked 
and updates the subscription Object.
 * If it was the case of revocation which we check by checking the 
“partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
which internally calls “updatePartitionCount()” which fetches partition from 
the *assignment* object which is yet not updated by the new assignment.
 * It is only just before calling the “{*}invokePartitionsAssigned{*}()” method 
that we update the *assignment* by invoking the following → 
*subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Description: (was: Issue:

In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"invokePartitionsRevoked" method of "onJoinComplete" function of 
"ConsumerCoordinator" class is being called before the "

subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a 
result of which the old assigned partition count is getting updated again and 
again even after future rebalances.

Demo:

Suppose the current assignment is like:

Assigned partitions: [partition-1, partition-2]
Current owned partitions: []
Added partitions (assigned - owned): [partition-1, partition-2]
Revoked partitions (owned - assigned): []

After that some other worker joined and part of that, as a result of which 
“partition-2” has to be revoked.

Assigned partitions: [partition-1]
Current owned partitions: [partition-1, partition-2]
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): [partition-2]

But as the "assignment" need to be updated with these new assignment via the 
following logic:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]

Line 463 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||subscriptions.assignFromSubscribed(assignedPartitions);|

 

But before this only "{*}updatePartitionCount{*}()" is getting called via 
"{*}invokePartitionsRevoked{*}":

 
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]

Line 443 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||firstException.compareAndSet(null, 
invokePartitionsRevoked(revokedPartitions));|

 

Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
"{*}consumer{*}" via the following:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]

Line 892 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||public Set assignment() {|

 

the "{*}assignedPartitions{*}" is not yet updated.

Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
update the partition count metrics after the the newly assigned partition are 
registered.)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar resolved KAFKA-14220.
--
  Reviewer:   (was: Chris Egerton)
Resolution: Abandoned

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar closed KAFKA-14220.


> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Flags:   (was: Patch,Important)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Labels:   (was: pull-request-available)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14220 ]


Pritam Kumar deleted comment on KAFKA-14220:
--

was (Author: JIRAUSER295638):
The following is the patch made and verified:
*https://github.com/apache/kafka/pull/12622*

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14220 ]


Pritam Kumar deleted comment on KAFKA-14220:
--

was (Author: JIRAUSER295638):
https://github.com/apache/kafka/pull/12622

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Component/s: (was: KafkaConnect)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-13 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Description: 
Issue:

In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"invokePartitionsRevoked" method of "onJoinComplete" function of 
"ConsumerCoordinator" class is being called before the "

subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a 
result of which the old assigned partition count is getting updated again and 
again even after future rebalances.

Demo:

Suppose the current assignment is like:

Assigned partitions: [partition-1, partition-2]
Current owned partitions: []
Added partitions (assigned - owned): [partition-1, partition-2]
Revoked partitions (owned - assigned): []

After that some other worker joined and part of that, as a result of which 
“partition-2” has to be revoked.

Assigned partitions: [partition-1]
Current owned partitions: [partition-1, partition-2]
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): [partition-2]

But as the "assignment" need to be updated with these new assignment via the 
following logic:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]

Line 463 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||subscriptions.assignFromSubscribed(assignedPartitions);|

 

But before this only "{*}updatePartitionCount{*}()" is getting called via 
"{*}invokePartitionsRevoked{*}":

 
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]

Line 443 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||firstException.compareAndSet(null, 
invokePartitionsRevoked(revokedPartitions));|

 

Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
"{*}consumer{*}" via the following:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]

Line 892 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||public Set assignment() {|

 

the "{*}assignedPartitions{*}" is not yet updated.

Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
update the partition count metrics after the the newly assigned partition are 
registered.

  was:
In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of 
"{*}ConsumerCoordinator{*}" class is being called before the "

{*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same 
class. As a result of which the *old assigned partition count* is getting 
updated again and again even after future rebalances.


> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.0.1
>
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> 

[jira] [Commented] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-12 Thread Pritam Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603056#comment-17603056
 ] 

Pritam Kumar commented on KAFKA-14220:
--

The following is the patch made and verified:
*https://github.com/apache/kafka/pull/12622*

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of 
> "{*}ConsumerCoordinator{*}" class is being called before the "
> {*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same 
> class. As a result of which the *old assigned partition count* is getting 
> updated again and again even after future rebalances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-12 Thread Pritam Kumar (Jira)
Pritam Kumar created KAFKA-14220:


 Summary: "partition-count" not getting updated after revocation of 
partitions in case of Incremental Co-operative rebalancing.
 Key: KAFKA-14220
 URL: https://issues.apache.org/jira/browse/KAFKA-14220
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.0.1
Reporter: Pritam Kumar


In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of 
"{*}ConsumerCoordinator{*}" class is being called before the "

{*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same 
class. As a result of which the *old assigned partition count* is getting 
updated again and again even after future rebalances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)