Re: [PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27(change apache kerby) [kafka]
highluck commented on PR #15277: URL: https://github.com/apache/kafka/pull/15277#issuecomment-1913048227 @divijvaidya Please review the code :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16066: Upgrade apacheds to 2.0.0.AM27(change apache kerby) [kafka]
highluck opened a new pull request, #15277: URL: https://github.com/apache/kafka/pull/15277 - apacheds change apache kerby - I deleted apacheds because I didn’t need it. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Deprecate long based range queries in SessionStore [kafka]
armdave opened a new pull request, #15276: URL: https://github.com/apache/kafka/pull/15276 [KAFKA-13130](https://issues.apache.org/jira/browse/KAFKA-13130) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]
DL1231 commented on PR #15211: URL: https://github.com/apache/kafka/pull/15211#issuecomment-1912974892 @dajac Addressed your review comments. PTAL when you get chance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27
[ https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811467#comment-17811467 ] highluck commented on KAFKA-16066: -- thanks [~divijvaidya] :) > Upgrade apacheds to 2.0.0.AM27 > -- > > Key: KAFKA-16066 > URL: https://issues.apache.org/jira/browse/KAFKA-16066 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: highluck >Priority: Major > Labels: newbie, newbie++ > > We are currently using a very old dependency. Notably, apacheds is only used > for testing when we use MiniKdc, hence, there is nothing stopping us from > upgrading it. > Notably, apacheds has removed the component > org.apache.directory.server:apacheds-protocol-kerberos in favour of using > Apache Kerby, hence, we need to make changes in MiniKdc.scala for this > upgrade to work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27
[ https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] highluck reassigned KAFKA-16066: Assignee: highluck > Upgrade apacheds to 2.0.0.AM27 > -- > > Key: KAFKA-16066 > URL: https://issues.apache.org/jira/browse/KAFKA-16066 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: highluck >Priority: Major > Labels: newbie, newbie++ > > We are currently using a very old dependency. Notably, apacheds is only used > for testing when we use MiniKdc, hence, there is nothing stopping us from > upgrading it. > Notably, apacheds has removed the component > org.apache.directory.server:apacheds-protocol-kerberos in favour of using > Apache Kerby, hence, we need to make changes in MiniKdc.scala for this > upgrade to work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]
DL1231 commented on code in PR #15211: URL: https://github.com/apache/kafka/pull/15211#discussion_r1468309807 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -739,6 +747,12 @@ public Optional offsetExpirationCondition() { return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)); } +@Override +public boolean isInStatesCaseInsensitive(List statesFilter, long committedOffset) { +Set caseInsensitiveFilterSet = statesFilter.stream().map(String::toLowerCase).map(String::trim).collect(Collectors.toSet()); Review Comment: Done. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -57,10 +57,15 @@ public String toString() { */ String stateAsString(long committedOffset); +/** + * @return The {{@link GroupType}}'s LowerCase String representation based on the committed offset. + */ +String stateAsLowerCaseString(long committedOffset); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]
DL1231 commented on code in PR #15211: URL: https://github.com/apache/kafka/pull/15211#discussion_r1468309502 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupState.java: ## @@ -114,15 +115,20 @@ public enum ClassicGroupState { DEAD.addValidPreviousStates(STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE, EMPTY, DEAD); } -ClassicGroupState(String name) { +ClassicGroupState(String name, String lowerCaseName) { this.name = name; +this.lowerCaseName = lowerCaseName; } @Override public String toString() { return name; } +public String toLowerCaseString() { Review Comment: Thanks for the suggestion, I added the `toCapitalizedString` to `ConsumerGroup.ConsumerGroupState`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]
DL1231 commented on code in PR #15211: URL: https://github.com/apache/kafka/pull/15211#discussion_r1468299153 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupState.java: ## @@ -114,15 +115,20 @@ public enum ClassicGroupState { DEAD.addValidPreviousStates(STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE, EMPTY, DEAD); } -ClassicGroupState(String name) { +ClassicGroupState(String name, String lowerCaseName) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Change `CreateTopicPolicy.RequestMetadata` to always provide partitions, replication factor and replica assignments. [kafka]
github-actions[bot] commented on PR #14655: URL: https://github.com/apache/kafka/pull/14655#issuecomment-1912947638 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16156: -- Component/s: consumer > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at >
[jira] [Updated] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16156: -- Labels: consumer-threading-refactor (was: ) > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at >
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16113: -- Parent: (was: KAFKA-14246) Issue Type: Improvement (was: Sub-task) > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16199) Prune the event queue if events have expired before starting
[ https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16199: -- Parent: (was: KAFKA-15974) Issue Type: Improvement (was: Sub-task) > Prune the event queue if events have expired before starting > > > Key: KAFKA-16199 > URL: https://issues.apache.org/jira/browse/KAFKA-16199 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190 ] Kirk True deleted comment on KAFKA-16190: --- was (Author: kirktrue): [~lianetm] is this something we want in 3.8? > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Major > Labels: client-transitions-issues, kip-848-client-support, newbie > Fix For: 3.8.0 > > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent
[ https://issues.apache.org/jira/browse/KAFKA-16200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16200: -- Parent: (was: KAFKA-15974) Issue Type: Improvement (was: Sub-task) > Ensure RequestManager handling of expired timeouts are consistent > - > > Key: KAFKA-16200 > URL: https://issues.apache.org/jira/browse/KAFKA-16200 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16199) Prune the event queue if events have expired before starting
[ https://issues.apache.org/jira/browse/KAFKA-16199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16199: -- Priority: Critical (was: Major) > Prune the event queue if events have expired before starting > > > Key: KAFKA-16199 > URL: https://issues.apache.org/jira/browse/KAFKA-16199 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16185: -- Fix Version/s: 3.8.0 > Fix client reconciliation of same assignment received in different epochs > -- > > Key: KAFKA-16185 > URL: https://issues.apache.org/jira/browse/KAFKA-16185 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Currently, the intention in the client state machine is that the client > always reconciles whatever it has pending that has not been removed by the > coordinator. > There is still an edge case where this does not happen, and the client might > get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and > it receives the same assignment, but in a new epoch (ex. after being FENCED). > First time it receives the assignment it takes no action, as it already has > it as pending to reconcile, but when the reconciliation completes it discards > the result because the epoch changed. And this is wrong. Note that after > sending the assignment with the new epoch one time, the broker continues to > send null assignments. > Here is a sample sequence leading to the client stuck JOINING: > - client joins, epoch 0 > - client receives assignment tp1, stuck RECONCILING, epoch 1 > - member gets FENCED on the coord, coord bumps epoch to 2 > - client tries to rejoin (JOINING), epoch 0 provided by the client > - new member added to the group (group epoch bumped to 3), client receives > same assignment that is currently trying to reconcile (tp1), but with epoch 3 > - previous reconciliation completes, but will discard the result because it > will notice that the memberHasRejoined (memberEpochOnReconciliationStart != > memberEpoch). Client is stuck JOINING, with the server sending null target > assignment because it hasn't changed since the last one sent (tp1) > (We should end up with a test similar to the existing > #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case > that the member receives the same assignment after being fenced and rejoining) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16190: -- Fix Version/s: 3.8.0 > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Major > Labels: client-transitions-issues, kip-848-client-support, newbie > Fix For: 3.8.0 > > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16169) FencedException in commitAsync not propagated without callback
[ https://issues.apache.org/jira/browse/KAFKA-16169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16169: -- Fix Version/s: 3.8.0 > FencedException in commitAsync not propagated without callback > -- > > Key: KAFKA-16169 > URL: https://issues.apache.org/jira/browse/KAFKA-16169 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > The javadocs for {{commitAsync()}} (w/o callback) say: > @throws org.apache.kafka.common.errors.FencedInstanceIdException if this > consumer instance gets fenced by broker. > > If no callback is passed into {{{}commitAsync(){}}}, no offset commit > callback invocation is submitted. However, we only check for a > {{FencedInstanceIdException}} when we execute a callback. It seems to me that > with {{commitAsync()}} we would not throw at all when the consumer gets > fenced. > In any case, we need a unit test that verifies that the > {{FencedInstanceIdException}} is thrown for each version of > {{{}commitAsync(){}}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16178: -- Fix Version/s: 3.8.0 > AsyncKafkaConsumer doesn't retry joining the group after rediscovering group > coordinator > > > Key: KAFKA-16178 > URL: https://issues.apache.org/jira/browse/KAFKA-16178 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Dongnuo Lyu >Assignee: Philip Nee >Priority: Critical > Labels: client-transitions-issues, consumer-threading-refactor > Fix For: 3.8.0 > > Attachments: pkc-devc63jwnj_jan19_0_debug > > > {code:java} > [2024-01-17 21:34:59,500] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the > group coordinator > Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator > again and retry in 0ms: This is not the correct coordinator. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Group coordinator > b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: > null) is unavailable or invalid due to cause: This is not the correct > coordinator.. Rediscovery will be attempted. > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136) > [2024-01-17 21:34:59,882] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code} > Some of the consumers don't consume any message. The logs show that after the > consumer starts up and successfully logs in, > # The consumer discovers the group coordinator. > # The heartbeat to join group fails because "This is not the correct > coordinator" > # The consumer rediscover the group coordinator. > Another heartbeat should follow the rediscovery of the group coordinator, but > there's no logs showing sign of a heartbeat request. > On the server side, there is completely no log about the group id. A > suspicion is that the consumer doesn't send a heartbeat request after > rediscover the group coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811431#comment-17811431 ] Kirk True commented on KAFKA-16190: --- [~lianetm] is this something we want in 3.8? > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Major > Labels: client-transitions-issues, kip-848-client-support, newbie > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Parent: (was: KAFKA-15974) Issue Type: Bug (was: Sub-task) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15475: -- Parent: (was: KAFKA-15974) Issue Type: Bug (was: Sub-task) > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15305: -- Parent: (was: KAFKA-15974) Issue Type: Bug (was: Sub-task) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16115: -- Parent: (was: KAFKA-14246) Issue Type: Improvement (was: Sub-task) > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16190: -- Labels: client-transitions-issues kip-848-client-support newbie (was: client-transitions-issues newbie) > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Major > Labels: client-transitions-issues, kip-848-client-support, newbie > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16185: -- Labels: client-transitions-issues kip-848-client-support (was: client-transitions-issues) > Fix client reconciliation of same assignment received in different epochs > -- > > Key: KAFKA-16185 > URL: https://issues.apache.org/jira/browse/KAFKA-16185 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, kip-848-client-support > > Currently, the intention in the client state machine is that the client > always reconciles whatever it has pending that has not been removed by the > coordinator. > There is still an edge case where this does not happen, and the client might > get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and > it receives the same assignment, but in a new epoch (ex. after being FENCED). > First time it receives the assignment it takes no action, as it already has > it as pending to reconcile, but when the reconciliation completes it discards > the result because the epoch changed. And this is wrong. Note that after > sending the assignment with the new epoch one time, the broker continues to > send null assignments. > Here is a sample sequence leading to the client stuck JOINING: > - client joins, epoch 0 > - client receives assignment tp1, stuck RECONCILING, epoch 1 > - member gets FENCED on the coord, coord bumps epoch to 2 > - client tries to rejoin (JOINING), epoch 0 provided by the client > - new member added to the group (group epoch bumped to 3), client receives > same assignment that is currently trying to reconcile (tp1), but with epoch 3 > - previous reconciliation completes, but will discard the result because it > will notice that the memberHasRejoined (memberEpochOnReconciliationStart != > memberEpoch). Client is stuck JOINING, with the server sending null target > assignment because it hasn't changed since the last one sent (tp1) > (We should end up with a test similar to the existing > #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case > that the member receives the same assignment after being fenced and rejoining) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Parent: (was: KAFKA-15974) Issue Type: Improvement (was: Sub-task) > Add timeout to all the CompletableApplicationEvents > --- > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > Original Estimate: 40h > Remaining Estimate: 40h > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]
mjsax commented on PR #15151: URL: https://github.com/apache/kafka/pull/15151#issuecomment-1912805679 Merged to `trunk` and cherry-picked to `3.7` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]
mjsax merged PR #15151: URL: https://github.com/apache/kafka/pull/15151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]
mjsax commented on PR #15151: URL: https://github.com/apache/kafka/pull/15151#issuecomment-1912803502 On `StreamsStandbyTask ` failed, which was fixed recently via https://github.com/apache/kafka/pull/15217. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14404: fix overlap of streams-config sections & describe additional parameters [kafka]
mjsax commented on PR #15162: URL: https://github.com/apache/kafka/pull/15162#issuecomment-1912801281 Thanks for pushing this over the finish line @AyoubOm! Great PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Minor update to KafkaApisTest [kafka]
jolshan merged PR #15257: URL: https://github.com/apache/kafka/pull/15257 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]
C0urante commented on code in PR #15249: URL: https://github.com/apache/kafka/pull/15249#discussion_r1468141005 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -238,9 +238,6 @@ public void testBrokerCoordinator() throws Exception { connect.kafka().stopOnlyKafka(); -connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, -"Group of workers did not remain the same after broker shutdown"); - Review Comment: This check doesn't seem necessary. A distributed Connect worker does become unavailable when the underlying Kafka cluster is unavailable, and the existing check (that a request to `GET /` is successful) doesn't really prove much about worker liveness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]
C0urante commented on code in PR #15249: URL: https://github.com/apache/kafka/pull/15249#discussion_r1468141005 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -238,9 +238,6 @@ public void testBrokerCoordinator() throws Exception { connect.kafka().stopOnlyKafka(); -connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, -"Group of workers did not remain the same after broker shutdown"); - Review Comment: This check doesn't seem necessary, and fails now if left in. A distributed Connect worker does become unavailable when the underlying Kafka cluster is unavailable, and the existing check (that a request to `GET /` is successful) doesn't really prove much about worker liveness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Remaining Estimate: 40h (was: 120h) Original Estimate: 40h (was: 120h) > Add timeout to all the CompletableApplicationEvents > --- > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > Original Estimate: 40h > Remaining Estimate: 40h > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Remaining Estimate: 120h Original Estimate: 120h > Add timeout to all the CompletableApplicationEvents > --- > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > Original Estimate: 120h > Remaining Estimate: 120h > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15848) Consumer API timeout inconsistent between ConsumerDelegate implementations
[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15848: -- Parent: KAFKA-15974 Issue Type: Sub-task (was: Bug) > Consumer API timeout inconsistent between ConsumerDelegate implementations > -- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired
[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15305: -- Parent: KAFKA-15974 Issue Type: Sub-task (was: Improvement) > The background thread should try to process the remaining task until the > shutdown timer is expired > -- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15475: -- Parent: KAFKA-15974 Issue Type: Sub-task (was: Bug) > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeouts
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Description: The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to block waiting for the event to complete. The application thread will block for the timeout, but there is not yet a consistent manner in which events are timed out. (was: The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to block waiting for the event to complete. The application thread will block for the timeout, but there is not yet a consistent manner in which events are timed out. Steps: # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s # Prune the event queue if events have expired before starting # Canceled by the background thread if the event expired after starting) > Enforce that events and requests respect user-provided timeouts > --- > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16200) Ensure RequestManager handling of expired timeouts are consistent
Kirk True created KAFKA-16200: - Summary: Ensure RequestManager handling of expired timeouts are consistent Key: KAFKA-16200 URL: https://issues.apache.org/jira/browse/KAFKA-16200 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16199) Prune the event queue if events have expired before starting
Kirk True created KAFKA-16199: - Summary: Prune the event queue if events have expired before starting Key: KAFKA-16199 URL: https://issues.apache.org/jira/browse/KAFKA-16199 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Add timeout to all the CompletableApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Summary: Add timeout to all the CompletableApplicationEvents (was: Consistent handling of timeouts and responses for new consumer ApplicationEvents) > Add timeout to all the CompletableApplicationEvents > --- > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15974) Enforce that events and requests respect user-provided timeouts
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Summary: Enforce that events and requests respect user-provided timeouts (was: Enforce that CompletableApplicationEvent has a timeout that is respected) > Enforce that events and requests respect user-provided timeouts > --- > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. > Steps: > # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s > # Prune the event queue if events have expired before starting > # Canceled by the background thread if the event expired after starting -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Parent: KAFKA-15974 Issue Type: Sub-task (was: Improvement) > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15974) Enforce that CompletableApplicationEvent has a timeout that is respected
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Description: The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to block waiting for the event to complete. The application thread will block for the timeout, but there is not yet a consistent manner in which events are timed out. Steps: # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s # Prune the event queue if events have expired before starting # Canceled by the background thread if the event expired after starting was: The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to block waiting for the event to complete. The application thread will block for the timeout, but there is not yet a consistent manner in which events are timed out. Steps: 1. Add the timeout to all the {{CompletableApplicationEvent}}s 2. Prune the event queue if events have expired before starting 3. Canceled by the background thread if the event expired after starting > Enforce that CompletableApplicationEvent has a timeout that is respected > > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. > Steps: > # Add the timeout to all the {{{}CompletableApplicationEvent{}}}s > # Prune the event queue if events have expired before starting > # Canceled by the background thread if the event expired after starting -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]
hgeraldino commented on code in PR #14663: URL: https://github.com/apache/kafka/pull/14663#discussion_r1468126514 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ## @@ -0,0 +1,763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; +import org.apache.kafka.connect.runtime.WorkerSinkTask.SinkTaskMetricsGroup; +import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; +import org.apache.kafka.connect.runtime.errors.ErrorReporter; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; +import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.storage.ClusterConfigState; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.MockitoRule; +import org.mockito.quality.Strictness; +import org.mockito.stubbing.Answer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class WorkerSinkTaskMockitoTest { +// These are fixed to keep this code simpler. In this example we assume byte[] raw values +// with mix of integer/string in Connect +private static final String TOPIC = "test"; +private static final int PARTITION = 12; +private static final int PARTITION2 = 13; +private static final int PARTITION3 = 14; +private static final long FIRST_OFFSET = 45; +private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; +private static final int
[jira] [Updated] (KAFKA-15974) Enforce that CompletableApplicationEvent has a timeout that is respected
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Priority: Blocker (was: Major) > Enforce that CompletableApplicationEvent has a timeout that is respected > > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. > Steps: > 1. Add the timeout to all the {{CompletableApplicationEvent}}s > 2. Prune the event queue if events have expired before starting > 3. Canceled by the background thread if the event expired after starting -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Component/s: consumer > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Priority: Blocker (was: Major) > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Parent: (was: KAFKA-14048) Issue Type: Improvement (was: Sub-task) > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16122) TransactionsBounceTest -- server disconnected before response was received
[ https://issues.apache.org/jira/browse/KAFKA-16122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-16122. Resolution: Fixed > TransactionsBounceTest -- server disconnected before response was received > -- > > Key: KAFKA-16122 > URL: https://issues.apache.org/jira/browse/KAFKA-16122 > Project: Kafka > Issue Type: Test >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > I noticed a ton of tests failing with > h4. > {code:java} > Error org.apache.kafka.common.KafkaException: Unexpected error in > TxnOffsetCommitResponse: The server disconnected before a response was > received. {code} > {code:java} > Stacktrace org.apache.kafka.common.KafkaException: Unexpected error in > TxnOffsetCommitResponse: The server disconnected before a response was > received. at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1702) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:457) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:334) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:249) > at java.base@21.0.1/java.lang.Thread.run(Thread.java:1583) {code} > The error indicates a network error which is retriable but the > TxnOffsetCommit handler doesn't expect this. > https://issues.apache.org/jira/browse/KAFKA-14417 addressed many of the other > requests but not this one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16122: Handle retriable errors in TxnOffsetCommits [kafka]
jolshan merged PR #15266: URL: https://github.com/apache/kafka/pull/15266 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]
dajac commented on PR #15253: URL: https://github.com/apache/kafka/pull/15253#issuecomment-1912511055 cc @nizhikov @jolshan FYI - I have a few minor changes related to the consumer group command here. As saw that you’re working on migrating it to Java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15987) Refactor ReplicaManager code for transaction verification
[ https://issues.apache.org/jira/browse/KAFKA-15987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-15987. Resolution: Fixed > Refactor ReplicaManager code for transaction verification > - > > Key: KAFKA-15987 > URL: https://issues.apache.org/jira/browse/KAFKA-15987 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > I started to do this in KAFKA-15784, but the diff was deemed too large and > confusing. I just wanted to file a followup ticket to reference this in code > for the areas that will be refactored. > > I hope to tackle it immediately after. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]
jolshan merged PR #15248: URL: https://github.com/apache/kafka/pull/15248 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
dajac commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1467998324 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String groupId) { } /** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. + * This is the handler commonly used by all the operations that requires to convert errors to + * coordinator errors. The handler also handles and log unexpected errors. + * + * @param requestName The name of the request. + * @param request The request itself for logging purposes. + * @param exception The exception to handle. + * @param responseBuilder A function which takes an Errors and a String and returns + * the response. The String can be null. + * @return The response. + * @param The type of the request. Review Comment: Yeah, I agree. The types do not actually matter because the request is only used for logging and the response is only passed from the handler to the caller. It perhaps mean that the structure of the helper is not right. Let me see if I can find an alternative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
dajac commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1467996373 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -360,14 +334,13 @@ public CompletableFuture joinGroup( Duration.ofMillis(config.offsetCommitTimeoutMs), coordinator -> coordinator.classicGroupJoin(context, request, responseFuture) ).exceptionally(exception -> { -if (!(exception instanceof KafkaException)) { Review Comment: Sorry, I did not get your point initially. The errors for those APIs are also converted but they are within ´classicGroupJoin’ based on the same scheme. So doing it here does not hurt. As a follow up, I’d like to remove the translation there in order to have it centralized here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]
jolshan commented on PR #15248: URL: https://github.com/apache/kafka/pull/15248#issuecomment-1912499332 Sorry I meant to check this one yesterday but accidentally looked at your other PR. I thought I was waiting on build issues there, but should have been looking here. Looking now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on PR #15271: URL: https://github.com/apache/kafka/pull/15271#issuecomment-1912485870 Hey @lucasbru, thanks for the changes, not only fixing the bug but also moving in the direction we want of internally spreading the use of topicIds. Left a few comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on PR #15271: URL: https://github.com/apache/kafka/pull/15271#issuecomment-1912485446 Just to clarify, regarding the last note on the PR description, "if a new assignment or new metadata arrives during an ongoing reconciliation, it will never be applied". Maybe I'm missing something you're seeing, but the current logic does apply new assignments received or discovered while there is an ongoing reconciliation. The main issue to improve with [KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832) is not having to rely on 2 triggers, or receiving (same) assignment from the broker (be able then to better handle edge cases around receiving same assignment in different epoch not triggering reconciliation). Maybe I'm missing something, but if not then I would suggest we just remove that note? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15832: Trigger client reconciliation based on manager poll [kafka]
lucasbru opened a new pull request, #15275: URL: https://github.com/apache/kafka/pull/15275 Currently, the reconciliation logic on the client is triggered when a new target assignment is received and resolved, or when new unresolved target assignments are discovered in metadata. This change improves this by triggering the reconciliation logic on each poll iteration, to reconcile whatever is ready to be reconciled. This would require changes to support poll on the MembershipManager, and integrate it with the current polling logic in the background thread. Receiving a new target assignment from the broker, or resolving new topic names via a metadata update could only ensure that the #assignmentReadyToReconcile is properly updated (currently done), but wouldn't trigger the #reconcile() logic, leaving that to the #poll() operation. Stacked PR, do not review the first commit. Draft - still working out some details and some tests missing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]
jolshan merged PR #15087: URL: https://github.com/apache/kafka/pull/15087 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467835947 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1340,18 +1336,15 @@ boolean reconciliationInProgress() { * When cluster metadata is updated, try to resolve topic names for topic IDs received in * assignment that hasn't been resolved yet. * - * Try to find topic names for all unresolved assignments + * Try to find topic names for all assignments * Add discovered topic names to the local topic names cache * If any topics are resolved, trigger a reconciliation process * If some topics still remain unresolved, request another metadata update * */ @Override public void onUpdate(ClusterResource clusterResource) { -resolveMetadataForUnresolvedAssignment(); -if (!assignmentReadyToReconcile.isEmpty()) { -reconcile(); -} +reconcile(); Review Comment: We should probably reconcile here only if we're in the RECONCILING state I would say. We could receive metadata updates anytime. With the current reconcile implementation I expect nothing bad would happen by calling this reconcile on a metadata update when there is nothing to reconcile or are in some other state/transition, but probably clearer/safer to reason about it with a short-circuit here to avoid unexpected effects/logs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467947528 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1311,21 +1294,34 @@ public Map> currentAssignment() { return this.currentAssignment; } - /** * @return Set of topic IDs received in a target assignment that have not been reconciled yet - * because topic names are not in metadata. Visible for testing. + * because topic names are not in metadata or reconciliation hasn't finished. Visible for testing. */ -Set topicsWaitingForMetadata() { -return Collections.unmodifiableSet(assignmentUnresolved.keySet()); +Set topicsAwaitingReconciliation() { Review Comment: We are slightly changing the concept of what this does, and I'm fine with it because it's ok for how it's used in the tests, but I think the description/name/implementation are not well aligned? This is not strictly getting topicsAwaitingReconciliation ("not in metadata or reconciliation hasn't finished"), ex. member owns one partitions of the topic, and another one is being added and stuck reconciling (committing offsets, revoking, etc), that topic wouldn't be returned here but it's indeed a `topicsAwaitingReconciliation` (for which "reconciliation hasn't finished"). What about we just rely on the`topicPartitionsAwaitingReconciliation` you defined below (that seems accurate to me), and here we just `return topicPartitionsAwaitingReconciliation().keySet();` I do know this is not causing trouble because of how it is used in the tests at the moment, but better to get it right to avoid confusion/misuse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467947528 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1311,21 +1294,34 @@ public Map> currentAssignment() { return this.currentAssignment; } - /** * @return Set of topic IDs received in a target assignment that have not been reconciled yet - * because topic names are not in metadata. Visible for testing. + * because topic names are not in metadata or reconciliation hasn't finished. Visible for testing. */ -Set topicsWaitingForMetadata() { -return Collections.unmodifiableSet(assignmentUnresolved.keySet()); +Set topicsAwaitingReconciliation() { Review Comment: We are slightly changing the concept of what this does, and I'm fine with it because it's ok for how it's used in the tests, but I think the description/name/implementation are not well aligned? This is not strictly getting topicsAwaitingReconciliation ("not in metadata or reconciliation hasn't finished"), ex. member owns one partitions of the topic, and another one is being added and stuck reconciling (committing offsets, revoking, etc), that topic wouldn't be returned here but it's indeed a `topicsAwaitingReconciliation` (for which "reconciliation hasn't finished"). What about we just rely on the`topicPartitionsAwaitingReconciliation` you defined below (that seems accurate to me), and here we just `return topicPartitionsAwaitingReconciliation().keySet();` I do know this is not causing trouble because of how it is used in the tests but better to get it right to avoid confusion/misuse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1467940534 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1098,30 +1054,36 @@ private static boolean isGroupIdNotEmpty(String groupId) { return groupId != null && !groupId.isEmpty(); } -/** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. - */ -private static Errors normalizeException(Throwable exception) { -exception = Errors.maybeUnwrapException(exception); - -if (exception instanceof UnknownTopicOrPartitionException || -exception instanceof NotEnoughReplicasException || -exception instanceof TimeoutException) { -return Errors.COORDINATOR_NOT_AVAILABLE; -} - -if (exception instanceof NotLeaderOrFollowerException || -exception instanceof KafkaStorageException) { -return Errors.NOT_COORDINATOR; -} - -if (exception instanceof RecordTooLargeException || -exception instanceof RecordBatchTooLargeException || -exception instanceof InvalidFetchSizeException) { -return Errors.UNKNOWN_SERVER_ERROR; +private RSP handleOperationException( +String requestName, +REQ request, +Throwable exception, +BiFunction responseBuilder +) { +ApiError apiError = ApiError.fromThrowable(exception); + +switch (apiError.error()) { +case UNKNOWN_SERVER_ERROR: +log.error("{} request {} hit an unexpected exception: {}.", +requestName, request, exception.getMessage(), exception); +return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, null); + +case UNKNOWN_TOPIC_OR_PARTITION: +case NOT_ENOUGH_REPLICAS: +case REQUEST_TIMED_OUT: +return responseBuilder.apply(Errors.COORDINATOR_NOT_AVAILABLE, null); + +case NOT_LEADER_OR_FOLLOWER: +case KAFKA_STORAGE_ERROR: +return responseBuilder.apply(Errors.NOT_COORDINATOR, null); + +case MESSAGE_TOO_LARGE: +case RECORD_LIST_TOO_LARGE: +case INVALID_FETCH_SIZE: +return responseBuilder.apply(Errors.UNKNOWN_SERVER_ERROR, null); + +default: +return responseBuilder.apply(apiError.error(), apiError.message()); Review Comment: Cool. The alternative is that we aren't returning the default string for the exception. I believe that is the case, but just wanted to confirm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1467939251 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -524,37 +495,39 @@ public CompletableFuture listGroups( ); } -final CompletableFuture future = new CompletableFuture<>(); -final List results = new ArrayList<>(); final Set existingPartitionSet = runtime.partitions(); -final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size()); if (existingPartitionSet.isEmpty()) { return CompletableFuture.completedFuture(new ListGroupsResponseData()); } +final List>> futures = +new ArrayList<>(); + for (TopicPartition tp : existingPartitionSet) { -runtime.scheduleReadOperation( +futures.add(runtime.scheduleReadOperation( "list-groups", tp, (coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset) -).handle((groups, exception) -> { -if (exception == null) { -synchronized (results) { -results.addAll(groups); -} +).exceptionally(exception -> { +exception = Errors.maybeUnwrapException(exception); Review Comment: I see. Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1467938411 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -360,14 +334,13 @@ public CompletableFuture joinGroup( Duration.ofMillis(config.offsetCommitTimeoutMs), coordinator -> coordinator.classicGroupJoin(context, request, responseFuture) ).exceptionally(exception -> { -if (!(exception instanceof KafkaException)) { Review Comment: Hmm -- in the KafkaException case we are converting errors though -- so the error handling for those seems to be changing right? if we return unknown_topic_or_partition we will convert the response to coordinator not available. Are you saying the error handling doesn't change because all the other errors are not expected to be returned and if they somehow do and get converted then it doesn't matter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Uniformize error handling/transformation in GroupCoordinatorService [kafka]
jolshan commented on code in PR #15196: URL: https://github.com/apache/kafka/pull/15196#discussion_r1467936095 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1099,29 +1055,48 @@ private static boolean isGroupIdNotEmpty(String groupId) { } /** - * Handles the exception in the scheduleWriteOperation. - * @return The Errors instance associated with the given exception. + * This is the handler commonly used by all the operations that requires to convert errors to + * coordinator errors. The handler also handles and log unexpected errors. + * + * @param requestName The name of the request. + * @param request The request itself for logging purposes. + * @param exception The exception to handle. + * @param responseBuilder A function which takes an Errors and a String and returns + * the response. The String can be null. + * @return The response. + * @param The type of the request. Review Comment: hmmm. I see. It is a little confusing to me. If I was trying to add a new request/response, it is not clear exactly what needs to be supplied and what to get out. But maybe that's something I would need to look in to more and most people would understand. It seems like req is really just used for logging the incoming info when there is an error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14576: Move ConsoleConsumer to tools [kafka]
mimaison opened a new pull request, #15274: URL: https://github.com/apache/kafka/pull/15274 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14585: Refactoring for moving the storage tool [kafka]
fvaleri opened a new pull request, #15273: URL: https://github.com/apache/kafka/pull/15273 Refactoring related to #14847. The following properties are migrated from KafkaConfig (core): - LogConfig (storage) - log.dir - logs.dirs - metadata.log.dir - inter.broker.protocol.version - unstable.api.versions.enable - unstable.metadata.versions.enable - RaftConfig (raft) - node.id - process.roles -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467868971 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1010,17 +1000,24 @@ private void resolveMetadataForUnresolvedAssignment() { Optional nameFromMetadata = findTopicNameInGlobalOrLocalCache(topicId); nameFromMetadata.ifPresent(resolvedTopicName -> { // Name resolved, so assignment is ready for reconciliation. -addToAssignmentReadyToReconcile(topicId, resolvedTopicName, topicPartitions); +topicPartitions.forEach(tp -> { +TopicIdPartition topicIdPartition = new TopicIdPartition( +topicId, +new TopicPartition(resolvedTopicName, tp)); Review Comment: Indentation is off here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467868408 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -997,11 +985,13 @@ void markReconciliationCompleted() { * * */ -private void resolveMetadataForUnresolvedAssignment() { -assignmentReadyToReconcile.clear(); +private SortedSet resolveMetadataForTargetAssignment() { +final SortedSet assignmentReadyToReconcile = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); +final HashMap> unresolved = new HashMap<>(currentTargetAssignment); + // Try to resolve topic names from metadata cache or subscription cache, and move // assignments from the "unresolved" collection, to the "readyToReconcile" one. Review Comment: Let's update this comment now that we don't have the "unresolved" and "readyToReconcile" collections exactly in that shape anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up core server classes [kafka]
jlprat commented on PR #15272: URL: https://github.com/apache/kafka/pull/15272#issuecomment-1912344480 @showuon if you have some time, this is a follow up of the one you reviewed already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up core server classes [kafka]
jlprat commented on code in PR #15272: URL: https://github.com/apache/kafka/pull/15272#discussion_r1467861593 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -80,7 +80,7 @@ object BrokerMetadataPublisher extends Logging { Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => if (!partition.replicas.contains(brokerId)) { -info(s"Found stray log dir $log: the current replica assignment ${partition.replicas} " + +info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("Array(", ", ", ")")} " + Review Comment: This actually should have printed the memory reference and not the contents. With the current version, it prints the contents. ## core/src/main/scala/kafka/server/ConfigAdminManager.scala: ## @@ -154,26 +154,25 @@ class ConfigAdminManager(nodeId: Int, throw new InvalidRequestException(s"Unknown resource type ${resource.resourceType().toInt}") } } catch { - case t: Throwable => { + case t: Throwable => val err = ApiError.fromThrowable(t) -info(s"Error preprocessing incrementalAlterConfigs request on ${configResource}", t) +info(s"Error preprocessing incrementalAlterConfigs request on $configResource", t) results.put(resource, err) - } } } }) results } - def validateBrokerConfigChange( + private def validateBrokerConfigChange( resource: IAlterConfigsResource, configResource: ConfigResource ): Unit = { val perBrokerConfig = !configResource.name().isEmpty val persistentProps = configRepository.config(configResource) val configProps = conf.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) val alterConfigOps = resource.configs().asScala.map { - case config => + config => Review Comment: Previous code was the standard in really old version of Scala, since then anonymous function style is preferred. ## core/src/main/scala/kafka/server/ConfigHelper.scala: ## @@ -80,7 +80,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo def describeConfigs(resourceToConfigNames: List[DescribeConfigsResource], includeSynonyms: Boolean, includeDocumentation: Boolean): List[DescribeConfigsResponseData.DescribeConfigsResult] = { -resourceToConfigNames.map { case resource => Review Comment: Previous code was the standard in really old version of Scala, since then anonymous function style is preferred. ## core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala: ## @@ -98,14 +98,14 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag } } quotaDelta.changes().entrySet().forEach { e => -handleUserClientQuotaChange(userClientEntity, e.getKey, e.getValue.asScala.map(_.toDouble)) +handleUserClientQuotaChange(userClientEntity, e.getKey, e.getValue.asScala) Review Comment: The map to Double wasn't needed as it was already a double ## core/src/main/scala/kafka/server/ConfigAdminManager.scala: ## @@ -369,15 +365,13 @@ object ConfigAdminManager { persistentResponses: AlterConfigsResponseData ): AlterConfigsResponseData = { val response = new AlterConfigsResponseData() -val responsesByResource = persistentResponses.responses().iterator().asScala.map { - case r => (r.resourceName(), r.resourceType()) -> new ApiError(r.errorCode(), r.errorMessage()) Review Comment: Previous code was the standard in really old version of Scala, since then anonymous function style is preferred. ## core/src/main/scala/kafka/server/AutoTopicCreationManager.scala: ## @@ -50,6 +50,10 @@ trait AutoTopicCreationManager { object AutoTopicCreationManager { + /** + * @deprecated use [[apply(kafka.server.KafkaConfig, kafka.server.MetadataCache, scala.Option, scala.Option, scala.Option, scala.Option, org.apache.kafka.coordinator.group.GroupCoordinator, kafka.coordinator.transaction.TransactionCoordinator)]] + */ + @deprecated("Use the alternative apply method", "3.8.0") Review Comment: I could alternatively delete this method altogether. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quoc Phong Dang reassigned KAFKA-16190: --- Assignee: Quoc Phong Dang > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Major > Labels: client-transitions-issues, newbie > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quoc Phong Dang reassigned KAFKA-16190: --- Assignee: Quoc Phong Dang > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Major > Labels: client-transitions-issues, newbie > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quoc Phong Dang reassigned KAFKA-16190: --- Assignee: (was: Quoc Phong Dang) > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, newbie > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is only reset on failure scenarios). > This should fix the issue that a client that is subscribed to a topic and > gets fenced, should try to rejoin providing the same subscription it had. > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this case given that it does explicitly change the subscription when it gets > fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Clean up core server classes [kafka]
jlprat opened a new pull request, #15272: URL: https://github.com/apache/kafka/pull/15272 Follow up from https://github.com/apache/kafka/pull/15252 Mark methods and fields private where possible: - Annotate public methods and fields - Remove unused classes and methods - Make sure Arrays are not printed with .toString - Optimize minor warnings ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467835947 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1340,18 +1336,15 @@ boolean reconciliationInProgress() { * When cluster metadata is updated, try to resolve topic names for topic IDs received in * assignment that hasn't been resolved yet. * - * Try to find topic names for all unresolved assignments + * Try to find topic names for all assignments * Add discovered topic names to the local topic names cache * If any topics are resolved, trigger a reconciliation process * If some topics still remain unresolved, request another metadata update * */ @Override public void onUpdate(ClusterResource clusterResource) { -resolveMetadataForUnresolvedAssignment(); -if (!assignmentReadyToReconcile.isEmpty()) { -reconcile(); -} +reconcile(); Review Comment: We should probably reconcile here only if we're in the RECONCILING state I would say. We could receive metadata updates anytime. With the current reconcile implementation I expect nothing bad would happen by calling this reconcile on a metadata update when there is nothing to reconcile, but it's still not the right thing to do, so probably better to short-circuit here to avoid undesired effects/logs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467829868 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -766,23 +755,28 @@ public void transitionToStale() { } /** - * Reconcile the assignment that has been received from the server and for which topic names - * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, - * trigger the callbacks and update the subscription state. Note that only one reconciliation + * Reconcile the assignment that has been received from the server. If for some topics, the + * topic ID cannot be matched to a topic name, a metadata update will be triggered and only + * the subset of topics that are resolvable will be reconciled. Reconcilation will trigger the Review Comment: typo in Reconciliation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lianetm commented on code in PR #15271: URL: https://github.com/apache/kafka/pull/15271#discussion_r1467829521 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -674,6 +662,7 @@ void transitionToSendingLeaveGroup() { ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; updateMemberEpoch(leaveEpoch); currentAssignment = new HashMap<>(); +targetAssignmentReconciled = currentAssignment.equals(currentTargetAssignment); Review Comment: I wonder if this could end up not being accurate in some case. Thinking about a consumer RECONCILING, and then leaving the group while on it. Here we would end up with `targetAssignmentReconciled` false, which is not accurate, as the target is not relevant anymore and will be discarded when that delayed reconciliation completes. Is there a value in the `targetAssignmentReconciled` var other than caching the sets comparison result? It definitely brings the challenge of making sure we update it with the right value, in all the right places. Would it be safer and simpler maybe just to keep the `targetAssignmentReconciled()` method, checking current vs target when needed? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -208,22 +209,18 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource private final Map assignedTopicNamesCache; /** - * Topic IDs received in a target assignment for which we haven't found topic names yet. - * Items are added to this set every time a target assignment is received. Items are removed - * when metadata is found for the topic. This is where the member collects all assignments - * received from the broker, even though they may not be ready to reconcile due to missing + * Topic IDs received in the last target assignment. Items are added to this set every time a Review Comment: nit: Topic IDs "and partitions"... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1467816570 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List records) { } } +private JoinGroupRequestProtocol throwIfProtocolUnmatched( +ConsumerGroupMember member, +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols +) { +for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { +final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); +ConsumerProtocol.deserializeVersion(buffer); +final Optional generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + +// If the generation id is provided, it must match the member epoch. +if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { +// TODO: need a list of all available server assignors +if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) +|| RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { +return protocol; +} +} +} +throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + +"protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + +member.memberEpoch() + ")."); +} + +private List transitionToConsumerGroupHeartbeatTopicPartitions( +List topicPartitions +) { +Map> topicMap = new HashMap<>(); +topicPartitions.forEach(tp -> +topicMap.computeIfAbsent(tp.topic(), __ -> new ArrayList<>()).add(tp.partition()) +); +return topicMap.entrySet().stream().map(item -> { +TopicImage topicImage = metadataImage.topics().getTopic(item.getKey()); +if (topicImage == null) { +throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic id of topic " + item.getKey() + "."); +} +return new ConsumerGroupHeartbeatRequestData.TopicPartitions() +.setTopicId(topicImage.id()) +.setPartitions(item.getValue()); +}).collect(Collectors.toList()); +} + +public CoordinatorResult upgradeGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs || +sessionTimeoutMs > classicGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +return EMPTY_RESULT; +} + +// Get or create the consumer group. +final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +throwIfConsumerGroupIsFull(group, memberId); + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +// new dynamic member. +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +member = group.getOrMaybeCreateMember(memberId, true); +newMemberCreated = !group.members().containsKey(memberId); +log.info("[GroupId {}] Member {} joins the consumer group.",
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1467814642 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List records) { } } +private JoinGroupRequestProtocol throwIfProtocolUnmatched( +ConsumerGroupMember member, +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols +) { +for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { +final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); +ConsumerProtocol.deserializeVersion(buffer); +final Optional generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + +// If the generation id is provided, it must match the member epoch. +if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { +// TODO: need a list of all available server assignors +if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) +|| RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { +return protocol; +} +} +} +throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + +"protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + +member.memberEpoch() + ")."); +} + +private List transitionToConsumerGroupHeartbeatTopicPartitions( +List topicPartitions +) { +Map> topicMap = new HashMap<>(); +topicPartitions.forEach(tp -> +topicMap.computeIfAbsent(tp.topic(), __ -> new ArrayList<>()).add(tp.partition()) +); +return topicMap.entrySet().stream().map(item -> { +TopicImage topicImage = metadataImage.topics().getTopic(item.getKey()); +if (topicImage == null) { +throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic id of topic " + item.getKey() + "."); +} +return new ConsumerGroupHeartbeatRequestData.TopicPartitions() +.setTopicId(topicImage.id()) +.setPartitions(item.getValue()); +}).collect(Collectors.toList()); +} + +public CoordinatorResult upgradeGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs || +sessionTimeoutMs > classicGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +return EMPTY_RESULT; +} + +// Get or create the consumer group. +final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +throwIfConsumerGroupIsFull(group, memberId); + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +// new dynamic member. +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +member = group.getOrMaybeCreateMember(memberId, true); +newMemberCreated = !group.members().containsKey(memberId); +log.info("[GroupId {}] Member {} joins the consumer group.",
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1467811816 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List records) { } } +private JoinGroupRequestProtocol throwIfProtocolUnmatched( +ConsumerGroupMember member, +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols +) { +for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { +final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); +ConsumerProtocol.deserializeVersion(buffer); +final Optional generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + +// If the generation id is provided, it must match the member epoch. +if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { +// TODO: need a list of all available server assignors +if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) +|| RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { +return protocol; +} +} +} +throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + +"protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + +member.memberEpoch() + ")."); +} + +private List transitionToConsumerGroupHeartbeatTopicPartitions( +List topicPartitions +) { +Map> topicMap = new HashMap<>(); +topicPartitions.forEach(tp -> +topicMap.computeIfAbsent(tp.topic(), __ -> new ArrayList<>()).add(tp.partition()) +); +return topicMap.entrySet().stream().map(item -> { +TopicImage topicImage = metadataImage.topics().getTopic(item.getKey()); +if (topicImage == null) { +throw INVALID_TOPIC_EXCEPTION.exception("Can't find the topic id of topic " + item.getKey() + "."); +} +return new ConsumerGroupHeartbeatRequestData.TopicPartitions() +.setTopicId(topicImage.id()) +.setPartitions(item.getValue()); +}).collect(Collectors.toList()); +} + +public CoordinatorResult upgradeGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +final int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < classicGroupMinSessionTimeoutMs || +sessionTimeoutMs > classicGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +return EMPTY_RESULT; +} + +// Get or create the consumer group. +final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false); +throwIfConsumerGroupIsFull(group, memberId); + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +// new dynamic member. +if (isUnknownMember && JoinGroupRequest.requiresKnownMemberId(context.apiVersion())) { +// If member id required, send back a response to call for another join group request with allocated member id. +log.info("Dynamic member with unknown member id joins group {}. " + +"Created a new member id {} and requesting the member to rejoin with this id.", +group.groupId(), memberId); + +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.MEMBER_ID_REQUIRED.code()) +); +return EMPTY_RESULT; +} else { +member = group.getOrMaybeCreateMember(memberId, true); +newMemberCreated = !group.members().containsKey(memberId); +log.info("[GroupId {}] Member {} joins the consumer group.",
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1467807980 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List records) { } } +private JoinGroupRequestProtocol throwIfProtocolUnmatched( +ConsumerGroupMember member, +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols +) { +for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { +final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); +ConsumerProtocol.deserializeVersion(buffer); +final Optional generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + +// If the generation id is provided, it must match the member epoch. +if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { +// TODO: need a list of all available server assignors +if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) +|| RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { +return protocol; +} +} +} +throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + +"protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + +member.memberEpoch() + ")."); Review Comment: maybe handle the case where generation id doesn't match the member epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: cleanup core modules part 1 [kafka]
jlprat merged PR #15252: URL: https://github.com/apache/kafka/pull/15252 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dongnuo123 commented on code in PR #15268: URL: https://github.com/apache/kafka/pull/15268#discussion_r1467804300 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3467,6 +3476,358 @@ public void maybeDeleteGroup(String groupId, List records) { } } +private JoinGroupRequestProtocol throwIfProtocolUnmatched( +ConsumerGroupMember member, +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols +) { +for (JoinGroupRequestData.JoinGroupRequestProtocol protocol : protocols) { +final ByteBuffer buffer = ByteBuffer.wrap(protocol.metadata()); +ConsumerProtocol.deserializeVersion(buffer); +final Optional generationId = ConsumerProtocol.deserializeSubscription(buffer, (short) 0).generationId(); + +// If the generation id is provided, it must match the member epoch. +if (!generationId.isPresent() || generationId.get() == member.memberEpoch()) { +// TODO: need a list of all available server assignors +if (UniformAssignor.UNIFORM_ASSIGNOR_NAME.equals(protocol.name()) +|| RangeAssignor.RANGE_ASSIGNOR_NAME.equals(protocol.name())) { +return protocol; +} +} +} +throw new FencedMemberEpochException("The JoinGroup request doesn't have a matched generation id from a " + +"protocol supported by the server assignors with the epoch of the member known by the group coordinator (" + +member.memberEpoch() + ")."); Review Comment: not supported by the old protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: cleanup core modules part 1 [kafka]
jlprat commented on PR #15252: URL: https://github.com/apache/kafka/pull/15252#issuecomment-1912259898 All tests failures seem to be unrelated to these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16198) Reconciliation may lose partitions when topic metadata is delayed
[ https://issues.apache.org/jira/browse/KAFKA-16198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16198: --- Labels: client-transitions-issues clients consumer kip-848 kip-848-client-support (was: clients consumer kip-848 kip-848-client-support) > Reconciliation may lose partitions when topic metadata is delayed > - > > Key: KAFKA-16198 > URL: https://issues.apache.org/jira/browse/KAFKA-16198 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > Labels: client-transitions-issues, clients, consumer, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` > may lose part of the server-provided assignment when metadata is delayed. The > reason is incorrect handling of partially resolved topic names, as in this > example: > * We get assigned {{T1-1}} and {{T2-1}} > * We reconcile {{{}T1-1{}}}, {{T2-1}} remains in {{assignmentUnresolved}} > since the topic id {{T2}} is not known yet > * We get new cluster metadata, which includes {{{}T2{}}}, so {{T2-1}} is > moved to {{assignmentReadyToReconcile}} > * We call {{reconcile}} -- {{T2-1}} is now treated as the full assignment, > so {{T1-1}} is being revoked > * We end up with assignment {{T2-1, which is inconsistent with the > broker-side target assignment.}} > > Generally, this seems to be a problem around semantics of the internal > collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence > of a topic in `assignmentReadyToReconcile` may mean either revocation of the > topic partition(s), or unavailability of a topic name for the topic. > Internal state with simpler and correct invariants could be achieved by using > a single collection `currentTargetAssignment` which is based on topic IDs and > always corresponds to the latest assignment received from the broker. During > every attempted reconciliation, all topic IDs will be resolved from the local > cache, which should not introduce a lot of overhead. `assignmentUnresolved` > and `assignmentReadyToReconcile` are removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lucasbru commented on PR #15271: URL: https://github.com/apache/kafka/pull/15271#issuecomment-1912206859 @lianetm Could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16162) New created topics are unavailable after upgrading to 3.7
[ https://issues.apache.org/jira/browse/KAFKA-16162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811308#comment-17811308 ] Gaurav Narula commented on KAFKA-16162: --- Raised https://github.com/apache/kafka/pull/15270 > New created topics are unavailable after upgrading to 3.7 > - > > Key: KAFKA-16162 > URL: https://issues.apache.org/jira/browse/KAFKA-16162 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Blocker > > In 3.7, we introduced the KIP-858 JBOD feature, and the brokerRegistration > request will include the `LogDirs` fields with UUID for each log dir in each > broker. This info will be stored in the controller and used to identify if > the log dir is known and online while handling AssignReplicasToDirsRequest > [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java#L2093]. > > While upgrading from old version, the kafka cluster will run in 3.7 binary > with old metadata version, and then upgrade to newer version using > kafka-features.sh. That means, while brokers startup and send the > brokerRegistration request, it'll be using older metadata version without > `LogDirs` fields included. And it makes the controller has no log dir info > for all brokers. Later, after upgraded, if new topic is created, the flow > will go like this: > 1. Controller assign replicas and adds in metadata log > 2. brokers fetch the metadata and apply it > 3. ReplicaManager#maybeUpdateTopicAssignment will update topic assignment > 4. After sending ASSIGN_REPLICAS_TO_DIRS to controller with replica > assignment, controller will think the log dir in current replica is offline, > so triggering offline handler, and reassign leader to another replica, and > offline, until no more replicas to assign, so assigning leader to -1 (i.e. no > leader) > So, the results will be that new created topics are unavailable (with no > leader) because the controller thinks all log dir are offline. > {code:java} > lukchen@lukchen-mac kafka % bin/kafka-topics.sh --describe --topic > quickstart-events3 --bootstrap-server localhost:9092 > > Topic: quickstart-events3 TopicId: s8s6tEQyRvmjKI6ctNTgPg PartitionCount: > 3 ReplicationFactor: 3Configs: segment.bytes=1073741824 > Topic: quickstart-events3 Partition: 0Leader: none > Replicas: 7,2,6 Isr: 6 > Topic: quickstart-events3 Partition: 1Leader: none > Replicas: 2,6,7 Isr: 6 > Topic: quickstart-events3 Partition: 2Leader: none > Replicas: 6,7,2 Isr: 6 > {code} > The log snippet in the controller : > {code:java} > # handling 1st assignReplicaToDirs request > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:0 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,370] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:2 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,371] DEBUG [QuorumController id=1] Broker 6 assigned > partition quickstart-events3:1 to OFFLINE dir 7K5JBERyyqFFxIXSXYluJA > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] > offline-dir-assignment: changing partition(s): quickstart-events3-0, > quickstart-events3-2, quickstart-events3-1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-0 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [7K5JBERyyqFFxIXSXYluJA, AA, AA], > partitionEpoch: 0 -> 1 (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] Replayed partition > change PartitionChangeRecord(partitionId=0, topicId=6ZIeidfiSTWRiOAmGEwn_g, > isr=null, leader=-2, replicas=null, removingReplicas=null, > addingReplicas=null, leaderRecoveryState=-1, > directories=[7K5JBERyyqFFxIXSXYluJA, AA, > AA], eligibleLeaderReplicas=null, lastKnownELR=null) for > topic quickstart-events3 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-01-18 19:34:47,372] DEBUG [QuorumController id=1] partition change for > quickstart-events3-2 with topic ID 6ZIeidfiSTWRiOAmGEwn_g: directories: > [AA, AA, AA] -> > [AA, 7K5JBERyyqFFxIXSXYluJA,
[PR] KAFKA-16198: Reconciliation may lose partitions when topic metadata is delayed [kafka]
lucasbru opened a new pull request, #15271: URL: https://github.com/apache/kafka/pull/15271 The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` may lose part of the server-provided assignment when metadata is delayed. The reason is incorrect handling of partially resolved topic names, as in this example: * We get assigned `T1-1` and `T2-1` * We reconcile `T1-1`, `T2-1` remains in `assignmentUnresolved` since the topic id T2 is not known yet * We get new cluster metadata, which includes `T2`, so `T2-1` is moved to `assignmentReadyToReconcile` * We call reconcile -- `T2-1` is now treated as the full assignment, so `T1-1` is being revoked * We end up with assignment `T2-1`, which is inconsistent with the broker-side target assignment. Generally, this seems to be a problem around semantics of the internal collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence of a topic in `assignmentReadyToReconcile` may either mean revocation of the topic partition(s), or unavailability of a topic name for the topic, depending on the context. This change reimplements that part of the internal state of `MembershipManagerImpl` with simpler and more correct invariants by using a single collection `currentTargetAssignment` which is based on topic IDs and always corresponds to the latest assignment received from the broker. During every attempted reconciliation, all topic IDs will be resolved from the local cache, which should not introduce a lot of overhead. `assignmentUnresolved` and `assignmentReadyToReconcile` are removed. This change is in line with the goal of using topic IDs instead of topic names in the consumer internal state, and fixes the bug of losing partitions when delayed metadata arrives. A unit test testing the above situation is added. Note, that this change does not fully the reconciliation problems, because if a new assignment or new metadata arrives during an ongoing reconciliation, it will never be applied. This will be solved in a separate change (KAFKA-15832). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16162: resend broker registration on metadata update to IBP 3.7-IV2 [kafka]
gaurav-narula commented on PR #15270: URL: https://github.com/apache/kafka/pull/15270#issuecomment-1912201646 CC: @showuon @OmniaGM @cmccabe @pprovenzano -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
dajac commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1467690595 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -40,6 +42,29 @@ enum GroupType { public String toString() { return name; } + +/** + * Parse a string into the corresponding {@code GroupType} enum value, in a case-insensitive manner. + * + * @return The {{@link GroupType}} according to the string passed. + * + * @throws IllegalArgumentException If the input string does not match any {@code GroupType}. + */ +public static GroupType parse(String typeString) { +for (GroupType type : GroupType.values()) { Review Comment: Yeah, we don't have to change it here as it is not related. The minimum to do here is to also build a Map within the enum to do the lookup. It is better to be consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
dajac commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1467689455 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -452,21 +453,38 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +Set statesFilter, +Set typesFilter, +long committedOffset +) { +// Converts each string to a value in the GroupType enum while being case-insensitive. +Set enumTypesFilter = typesFilter.stream() +.map(Group.GroupType::parse) +.collect(Collectors.toSet()); Review Comment: > In the consumerGroupCommand tool we return an illegalArgumentException if the type is unknown so I thought maybe it would make sense to do that here as well. Yeah, the context is a bit different here. You have to think about how illegalArgumentException will get returned to the user. The Kafka protocol does not have any error for this so it converts it to a Unknown Server Error without any details. This is not great from a user perspective. I think that the correct way would be to try parsing the received group type. If it succeeds, great, we keep it. Otherwise, we simply discard it. If you want to keep the illegalArgumentException, you could catch it when you call `parse`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16195: ignore metadata.log.dir failure in ZK mode [kafka]
gaurav-narula commented on PR #15262: URL: https://github.com/apache/kafka/pull/15262#issuecomment-1912105919 @showuon added a test with commit https://github.com/apache/kafka/pull/15262/commits/d0858deb5500ebdec071bc06ca6f98726ae6f602. Please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org