[jira] [Updated] (KAFKA-17599) Update Consumer Subscription with Current Assignment Before Partition Revocation
[ https://issues.apache.org/jira/browse/KAFKA-17599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-17599: - Fix Version/s: 3.9.1 > Update Consumer Subscription with Current Assignment Before Partition > Revocation > > > Key: KAFKA-17599 > URL: https://issues.apache.org/jira/browse/KAFKA-17599 > Project: Kafka > Issue Type: Improvement > Components: connect, consumer, group-coordinator >Reporter: Pritam Kumar >Assignee: Pritam Kumar >Priority: Minor > Labels: connect, consumer, group-coordinator > Fix For: 3.9.1 > > > +_*Summary:*_+ > Ensure that the Kafka consumer subscription object is updated with the > current partition assignment before partition revocation, so that close() > method of the sink connector's has the access to current updated partition. > *{+}_Description_{+}:* > In scenarios where Kafka Connect Sink Tasks handle partition revocation > events, it is critical to update the consumer subscription object with the > current partition assignment before the revocation process begins. This will > allow for better management of partition state and ensure that partition > ownership is clear and accurately reflected in the consumer group. > +_*Current Behaviour:*_+ > When partitions are revoked due to rebalancing, the current assignment is not > explicitly updated in the consumer subscription object before revocation, > leading close() being rendered to the old assignment state and it might be > possible that no open is called for this. > +_*Race Conditions:*_+ > Also this leads to some very rare race conditions. Race conditions in some > scenario in sink connectors: There could be race conditions leading to > sometime updated values and sometime old values of the current assigned > partitions in some scenarios. > Consider this: > Since the put call goes in a loop and we have some work in put which is using > "context.assignment" to access the current assignment. Let's say the task is > *assigned [0, 1]* and *[2] is being added* to it and *[0] is being removed* > from this. If the call happens in this case: > 1. Close call partition [0]. > 2. Put call comes with a batch of records > 3. Open call comes [2] > In this scenario accessing context.assignment inside put gives -> [0,1]. > But if the call happens in this way: > 1. Close call partition [0]. > 2. Open call comes for partition [2] > 3. Put call comes with a batch of records. > In this scenario accessing context.assignment inside put gives -> [1,2]. > This leads to stale and inconsistent situation which leads to inconsistent > behaviour for the connectors. > +_*Proposed Behavior:*_+ > Before partition revocation occurs, update the consumer subscription object > with the current partition assignment to ensure consistent state tracking and > smoother transitions during rebalancing. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17599) Update Consumer Subscription with Current Assignment Before Partition Revocation
[ https://issues.apache.org/jira/browse/KAFKA-17599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-17599: - External issue URL: https://github.com/apache/kafka/pull/17255 > Update Consumer Subscription with Current Assignment Before Partition > Revocation > > > Key: KAFKA-17599 > URL: https://issues.apache.org/jira/browse/KAFKA-17599 > Project: Kafka > Issue Type: Improvement > Components: connect, consumer, group-coordinator >Reporter: Pritam Kumar >Priority: Minor > Labels: connect, consumer, group-coordinator > > +_*Summary:*_+ > Ensure that the Kafka consumer subscription object is updated with the > current partition assignment before partition revocation, so that close() > method of the sink connector's has the access to current updated partition. > *{+}_Description_{+}:* > In scenarios where Kafka Connect Sink Tasks handle partition revocation > events, it is critical to update the consumer subscription object with the > current partition assignment before the revocation process begins. This will > allow for better management of partition state and ensure that partition > ownership is clear and accurately reflected in the consumer group. > +_*Current Behaviour:*_+ > When partitions are revoked due to rebalancing, the current assignment is not > explicitly updated in the consumer subscription object before revocation, > leading close() being rendered to the old assignment state and it might be > possible that no open is called for this. > +_*Race Conditions:*_+ > Also this leads to some very rare race conditions. Race conditions in some > scenario in sink connectors: There could be race conditions leading to > sometime updated values and sometime old values of the current assigned > partitions in some scenarios. > Consider this: > Since the put call goes in a loop and we have some work in put which is using > "context.assignment" to access the current assignment. Let's say the task is > *assigned [0, 1]* and *[2] is being added* to it and *[0] is being removed* > from this. If the call happens in this case: > 1. Close call partition [0]. > 2. Put call comes with a batch of records > 3. Open call comes [2] > In this scenario accessing context.assignment inside put gives -> [0,1]. > But if the call happens in this way: > 1. Close call partition [0]. > 2. Open call comes for partition [2] > 3. Put call comes with a batch of records. > In this scenario accessing context.assignment inside put gives -> [1,2]. > This leads to stale and inconsistent situation which leads to inconsistent > behaviour for the connectors. > +_*Proposed Behavior:*_+ > Before partition revocation occurs, update the consumer subscription object > with the current partition assignment to ensure consistent state tracking and > smoother transitions during rebalancing. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17599) Update Consumer Subscription with Current Assignment Before Partition Revocation
Pritam Kumar created KAFKA-17599: Summary: Update Consumer Subscription with Current Assignment Before Partition Revocation Key: KAFKA-17599 URL: https://issues.apache.org/jira/browse/KAFKA-17599 Project: Kafka Issue Type: Improvement Components: connect, consumer, group-coordinator Reporter: Pritam Kumar +_*Summary:*_+ Ensure that the Kafka consumer subscription object is updated with the current partition assignment before partition revocation, so that close() method of the sink connector's has the access to current updated partition. *{+}_Description_{+}:* In scenarios where Kafka Connect Sink Tasks handle partition revocation events, it is critical to update the consumer subscription object with the current partition assignment before the revocation process begins. This will allow for better management of partition state and ensure that partition ownership is clear and accurately reflected in the consumer group. +_*Current Behaviour:*_+ When partitions are revoked due to rebalancing, the current assignment is not explicitly updated in the consumer subscription object before revocation, leading close() being rendered to the old assignment state and it might be possible that no open is called for this. +_*Race Conditions:*_+ Also this leads to some very rare race conditions. Race conditions in some scenario in sink connectors: There could be race conditions leading to sometime updated values and sometime old values of the current assigned partitions in some scenarios. Consider this: Since the put call goes in a loop and we have some work in put which is using "context.assignment" to access the current assignment. Let's say the task is *assigned [0, 1]* and *[2] is being added* to it and *[0] is being removed* from this. If the call happens in this case: 1. Close call partition [0]. 2. Put call comes with a batch of records 3. Open call comes [2] In this scenario accessing context.assignment inside put gives -> [0,1]. But if the call happens in this way: 1. Close call partition [0]. 2. Open call comes for partition [2] 3. Put call comes with a batch of records. In this scenario accessing context.assignment inside put gives -> [1,2]. This leads to stale and inconsistent situation which leads to inconsistent behaviour for the connectors. +_*Proposed Behavior:*_+ Before partition revocation occurs, update the consumer subscription object with the current partition assignment to ensure consistent state tracking and smoother transitions during rebalancing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs
[ https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860164#comment-17860164 ] Pritam Kumar commented on KAFKA-14021: -- That we are doing on the start method, we are reading from the global topic, but start will be called only once and in between the consumer will rely while polling on the __consumer_offsets topics for poll the batch right? > MirrorMaker 2 should implement KIP-618 APIs > --- > > Key: KAFKA-14021 > URL: https://issues.apache.org/jira/browse/KAFKA-14021 > Project: Kafka > Issue Type: Improvement > Components: connect, mirrormaker >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.5.0 > > > The {{MirrorSourceConnector}} class should implement the new APIs added by > KIP-618. > This includes the > [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54] > method and, potentially, the > [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73] > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs
[ https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860162#comment-17860162 ] Pritam Kumar edited comment on KAFKA-14021 at 6/26/24 1:26 PM: --- Yes [~ChrisEgerton] the thing is the consumer will poll the records and if TransactionBoundary is set to poll the transaction will start and the producer will producer the records to the destination cluster and commit the offsets to the global or user configured offset topic and transaction will end. Now task.commit is called and suppose this commit is failed. After this if the task is started the consumer will again poll from the last offset only and the records will be duplicated in the destination cluster. I hope I understood this correctly, but what I meant to say without "offset syncs" can we guarantee exactly once delivery of records. Sorry If I have mis-understood something here. was (Author: JIRAUSER301755): Yes [~ChrisEgerton] the thing is the consumer will poll the records and if TransactionBoundary is set to poll the transaction will start and the producer will producer the records to the destination cluster and commit the offsets to the global or user configured offset topic and transaction will end. Now task.commit is called and suppose this commit is failed. After this if the task is started the consumer will again poll from the last offset only and the records will be duplicated in the destination cluster. I hope I understood this correctly, but what I meant to say without "offset syncs" can we guarantee exactly once delivery of records. > MirrorMaker 2 should implement KIP-618 APIs > --- > > Key: KAFKA-14021 > URL: https://issues.apache.org/jira/browse/KAFKA-14021 > Project: Kafka > Issue Type: Improvement > Components: connect, mirrormaker >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.5.0 > > > The {{MirrorSourceConnector}} class should implement the new APIs added by > KIP-618. > This includes the > [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54] > method and, potentially, the > [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73] > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs
[ https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860162#comment-17860162 ] Pritam Kumar commented on KAFKA-14021: -- Yes [~ChrisEgerton] the thing is the consumer will poll the records and if TransactionBoundary is set to poll the transaction will start and the producer will producer the records to the destination cluster and commit the offsets to the global or user configured offset topic and transaction will end. Now task.commit is called and suppose this commit is failed. After this if the task is started the consumer will again poll from the last offset only and the records will be duplicated in the destination cluster. I hope I understood this correctly, but what I meant to say without "offset syncs" can we guarantee exactly once delivery of records. > MirrorMaker 2 should implement KIP-618 APIs > --- > > Key: KAFKA-14021 > URL: https://issues.apache.org/jira/browse/KAFKA-14021 > Project: Kafka > Issue Type: Improvement > Components: connect, mirrormaker >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.5.0 > > > The {{MirrorSourceConnector}} class should implement the new APIs added by > KIP-618. > This includes the > [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54] > method and, potentially, the > [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73] > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14021) MirrorMaker 2 should implement KIP-618 APIs
[ https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860154#comment-17860154 ] Pritam Kumar commented on KAFKA-14021: -- [~ChrisEgerton] Although we have implemented the @Override public ExactlyOnceSupport exactlyOnceSupport(Map props) { return consumerUsesReadCommitted(props) ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED; } In the source connector, but one thing we should make sure that before polling the consumer in the source connector should sync the offsets between the offsets committed in the offset.storage.topic and __offsets topic which a plain consumer uses. The sync is happening in the task.commit() call which is called outside the transaction. private void commitTransaction() { log.debug("{} Committing offsets", this); long started = time.milliseconds(); AtomicReference flushError = new AtomicReference<>(); boolean shouldFlush = false; try { // Begin the flush without waiting, as there should not be any concurrent flushes. // This is because commitTransaction is always called on the same thread, and should always block until // the flush is complete, or cancel the flush if an error occurs. shouldFlush = offsetWriter.beginFlush(); } catch (Throwable e) { flushError.compareAndSet(null, e); } if (flushError.get() == null && !transactionOpen && !shouldFlush) { // There is no contents on the framework side to commit, so skip the offset flush and producer commit long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); commitSourceTask(); return; } // We might have just aborted a transaction, in which case we'll have to begin a new one // in order to commit offsets maybeBeginTransaction(); if (shouldFlush) { // Now we can actually write the offsets to the internal topic. // No need to track the flush future here since it's guaranteed to complete by the time // Producer::commitTransaction completes // We do have to track failures for that callback though, since they may originate from outside // the producer (i.e., the offset writer or the backing offset store), and would not cause // Producer::commitTransaction to fail offsetWriter.doFlush((error, result) -> { if (error != null) { log.error("{} Failed to flush offsets to storage: ", ExactlyOnceWorkerSourceTask.this, error); flushError.compareAndSet(null, error); } else { log.trace("{} Finished flushing offsets to storage", ExactlyOnceWorkerSourceTask.this); } }); } // Only commit the transaction if we were able to serialize the offsets. // Otherwise, we may commit source records without committing their offsets Throwable error = flushError.get(); if (error == null) { try { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd +_*producer.commitTransaction();*_+ } catch (Throwable t) { log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); flushError.compareAndSet(null, t); } transactionOpen = false; } error = flushError.get(); if (error != null) { recordCommitFailure(time.milliseconds() - started, null); offsetWriter.cancelFlush(); throw maybeWrapProducerSendException( "Failed to flush offsets and/or records for task " + id, error ); } transactionMetrics.commitTransaction(); long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); // Synchronize in order to guarantee that writes on other threads are picked up by this one synchronized (committableRecords) { committableRecords.forEach(this::commitTaskRecord); committableRecords.clear(); } +_*commitSourceTask();*_+ } Which could still result in the duplicate records. Can you please help me out here. > MirrorMaker 2 should implement KIP-618 APIs > --- > > Key: KAFKA-14021 > URL: https://issues.apache.org/jira/browse/KAFKA-14021 > Project: Kafka > Issue Type: Improvement > Components: connect, mirrormaker >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.5.0 > > > The {{MirrorSourceConnector}} class should implement the new APIs added by > KIP-618. > This includes the > [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54] > method and, potentially, the > [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73] > method. -- This messa
[jira] [Commented] (KAFKA-13679) Superfluous node disconnected log messages
[ https://issues.apache.org/jira/browse/KAFKA-13679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804157#comment-17804157 ] Pritam Kumar commented on KAFKA-13679: -- [~ChrisEgerton] any thoughts on the above issue, as it is flooding the logs. Thanks. > Superfluous node disconnected log messages > -- > > Key: KAFKA-13679 > URL: https://issues.apache.org/jira/browse/KAFKA-13679 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.1.0 >Reporter: Philip Bourke >Priority: Minor > > In Kafka 3.1 the "{_}Node x disconnected{_}" log message in the > {{NetworkClient.java}} class was changed from DEBUG to INFO - > [https://github.com/apache/kafka/commit/79d97bd29d059e8ba8ee7726b49d76e03e281059#diff-dcc1af531d191de8da1e23ad6d878a3efc463ba4670dbcf2896295a9dacd1c18R935] > Now my application logs are full of node disconnected messages and it would > indicate that there may be a connectivity problem. However I can see that the > logs are getting written every 5 minutes exactly, and it's the AdminClient > that is writing the logs. > {code:bash} > 2022-02-16 14:45:39,277 [d-60105f051cdb-admin] INFO > o.apache.kafka.clients.NetworkClient - [AdminClient > clientId=desktop-session-internal-user-streamer-v1-9888ff1d-446e-40cd-88dd-60105f051cdb-admin] > Node 1 disconnected. > {code} > My guess is that it may be the > [connections.max.idle.ms|https://kafka.apache.org/documentation/#adminclientconfigs_connections.max.idle.ms] > config setting, and there is in fact no issue with connectivity to the > brokers? > I'm raising this ticket here because the logs are full of these repetitive > messages indicating an issue and setting off alarm bells, and also because I > did not get a response on the confluent forum or in any slack channels. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-15680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-15680: - Fix Version/s: 3.6.1 > Partition-Count is not getting updated Correctly in the Incremental > Co-operative Rebalancing(ICR) Mode of Rebalancing > - > > Key: KAFKA-15680 > URL: https://issues.apache.org/jira/browse/KAFKA-15680 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Assignee: Pritam Kumar >Priority: Minor > Fix For: 3.6.1 > > > * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, > say Worker 3 joins, a new global assignment is computed by the leader, say > Worker1, that results in the revocation of some tasks from each existing > worker i.e Worker1 and Worker2. > * Once the new member join is completed, > *ConsumerCoordinator.OnJoinComplete()* method is called which primarily > computes all the new partitions assigned and the partitions which are revoked > and updates the subscription Object. > * If it was the case of revocation which we check by checking the > “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” > which internally calls “updatePartitionCount()” which fetches partition from > the *assignment* object which is yet not updated by the new assignment. > * It is only just before calling the “{*}invokePartitionsAssigned{*}()” > method that we update the *assignment* by invoking the following → > *subscriptions.assignFromSubscribed(assignedPartitions);* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing
Pritam Kumar created KAFKA-15680: Summary: Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing Key: KAFKA-15680 URL: https://issues.apache.org/jira/browse/KAFKA-15680 Project: Kafka Issue Type: Bug Components: connect Affects Versions: 3.0.1 Reporter: Pritam Kumar Assignee: Pritam Kumar * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, say Worker 3 joins, a new global assignment is computed by the leader, say Worker1, that results in the revocation of some tasks from each existing worker i.e Worker1 and Worker2. * Once the new member join is completed, *ConsumerCoordinator.OnJoinComplete()* method is called which primarily computes all the new partitions assigned and the partitions which are revoked and updates the subscription Object. * If it was the case of revocation which we check by checking the “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” which internally calls “updatePartitionCount()” which fetches partition from the *assignment* object which is yet not updated by the new assignment. * It is only just before calling the “{*}invokePartitionsAssigned{*}()” method that we update the *assignment* by invoking the following → *subscriptions.assignFromSubscribed(assignedPartitions);* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Description: (was: Issue: In case of the revocation of partitions, the updation of "partition count" metrics is being done before updating the new set of assignments. "invokePartitionsRevoked" method of "onJoinComplete" function of "ConsumerCoordinator" class is being called before the " subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a result of which the old assigned partition count is getting updated again and again even after future rebalances. Demo: Suppose the current assignment is like: Assigned partitions: [partition-1, partition-2] Current owned partitions: [] Added partitions (assigned - owned): [partition-1, partition-2] Revoked partitions (owned - assigned): [] After that some other worker joined and part of that, as a result of which “partition-2” has to be revoked. Assigned partitions: [partition-1] Current owned partitions: [partition-1, partition-2] Added partitions (assigned - owned): [] Revoked partitions (owned - assigned): [partition-2] But as the "assignment" need to be updated with these new assignment via the following logic: [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] Line 463 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||subscriptions.assignFromSubscribed(assignedPartitions);| But before this only "{*}updatePartitionCount{*}()" is getting called via "{*}invokePartitionsRevoked{*}": [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] Line 443 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));| Due to this when it is going to call for the "{*}assignedPartitions{*}" of "{*}consumer{*}" via the following: [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] Line 892 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||public Set assignment() {| the "{*}assignedPartitions{*}" is not yet updated. Solution: As part of the bug fix to KAFKA-12622 introducing code changes to update the partition count metrics after the the newly assigned partition are registered.) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar resolved KAFKA-14220. -- Reviewer: (was: Chris Egerton) Resolution: Abandoned > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar closed KAFKA-14220. > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Flags: (was: Patch,Important) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Labels: (was: pull-request-available) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220 ] Pritam Kumar deleted comment on KAFKA-14220: -- was (Author: JIRAUSER295638): The following is the patch made and verified: *https://github.com/apache/kafka/pull/12622* > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > Labels: pull-request-available > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220 ] Pritam Kumar deleted comment on KAFKA-14220: -- was (Author: JIRAUSER295638): https://github.com/apache/kafka/pull/12622 > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > Labels: pull-request-available > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Component/s: (was: KafkaConnect) > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assigned partitions: [partition-1] > Current owned partitions: [partition-1, partition-2] > Added partitions (assigned - owned): [] > Revoked partitions (owned - assigned): [partition-2] > But as the "assignment" need to be updated with these new assignment via the > following logic: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] > Line 463 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||subscriptions.assignFromSubscribed(assignedPartitions);| > > But before this only "{*}updatePartitionCount{*}()" is getting called via > "{*}invokePartitionsRevoked{*}": > > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] > Line 443 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||firstException.compareAndSet(null, > invokePartitionsRevoked(revokedPartitions));| > > Due to this when it is going to call for the "{*}assignedPartitions{*}" of > "{*}consumer{*}" via the following: > [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] > Line 892 in > [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] > ||public Set assignment() {| > > the "{*}assignedPartitions{*}" is not yet updated. > Solution: As part of the bug fix to KAFKA-12622 introducing code changes to > update the partition count metrics after the the newly assigned partition are > registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pritam Kumar updated KAFKA-14220: - Description: Issue: In case of the revocation of partitions, the updation of "partition count" metrics is being done before updating the new set of assignments. "invokePartitionsRevoked" method of "onJoinComplete" function of "ConsumerCoordinator" class is being called before the " subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a result of which the old assigned partition count is getting updated again and again even after future rebalances. Demo: Suppose the current assignment is like: Assigned partitions: [partition-1, partition-2] Current owned partitions: [] Added partitions (assigned - owned): [partition-1, partition-2] Revoked partitions (owned - assigned): [] After that some other worker joined and part of that, as a result of which “partition-2” has to be revoked. Assigned partitions: [partition-1] Current owned partitions: [partition-1, partition-2] Added partitions (assigned - owned): [] Revoked partitions (owned - assigned): [partition-2] But as the "assignment" need to be updated with these new assignment via the following logic: [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463] Line 463 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||subscriptions.assignFromSubscribed(assignedPartitions);| But before this only "{*}updatePartitionCount{*}()" is getting called via "{*}invokePartitionsRevoked{*}": [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443] Line 443 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));| Due to this when it is going to call for the "{*}assignedPartitions{*}" of "{*}consumer{*}" via the following: [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892] Line 892 in [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498] ||public Set assignment() {| the "{*}assignedPartitions{*}" is not yet updated. Solution: As part of the bug fix to KAFKA-12622 introducing code changes to update the partition count metrics after the the newly assigned partition are registered. was: In case of the revocation of partitions, the updation of "partition count" metrics is being done before updating the new set of assignments. "{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of "{*}ConsumerCoordinator{*}" class is being called before the " {*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same class. As a result of which the *old assigned partition count* is getting updated again and again even after future rebalances. > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > Labels: pull-request-available > Fix For: 3.0.1 > > > Issue: > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "invokePartitionsRevoked" method of "onJoinComplete" function of > "ConsumerCoordinator" class is being called before the " > subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As > a result of which the old assigned partition count is getting updated again > and again even after future rebalances. > Demo: > Suppose the current assignment is like: > Assigned partitions: [partition-1, partition-2] > Current owned partitions: [] > Added partitions (assigned - owned): [partition-1, partition-2] > Revoked partitions (owned - assigned): [] > After that some other worker joined and part of that, as a result of which > “partition-2” has to be revoked. > Assign
[jira] [Commented] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
[ https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17603056#comment-17603056 ] Pritam Kumar commented on KAFKA-14220: -- The following is the patch made and verified: *https://github.com/apache/kafka/pull/12622* > "partition-count" not getting updated after revocation of partitions in case > of Incremental Co-operative rebalancing. > - > > Key: KAFKA-14220 > URL: https://issues.apache.org/jira/browse/KAFKA-14220 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.1 >Reporter: Pritam Kumar >Priority: Major > > In case of the revocation of partitions, the updation of "partition count" > metrics is being done before updating the new set of assignments. > "{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of > "{*}ConsumerCoordinator{*}" class is being called before the " > {*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same > class. As a result of which the *old assigned partition count* is getting > updated again and again even after future rebalances. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.
Pritam Kumar created KAFKA-14220: Summary: "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing. Key: KAFKA-14220 URL: https://issues.apache.org/jira/browse/KAFKA-14220 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.0.1 Reporter: Pritam Kumar In case of the revocation of partitions, the updation of "partition count" metrics is being done before updating the new set of assignments. "{*}invokePartitionsRevoked{*}" method of "{*}onJoinComplete{*}" function of "{*}ConsumerCoordinator{*}" class is being called before the " {*}subscriptions.assignFromSubscribed(assignedPartitions){*}" of the same class. As a result of which the *old assigned partition count* is getting updated again and again even after future rebalances. -- This message was sent by Atlassian Jira (v8.20.10#820010)