Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on PR #15437: URL: https://github.com/apache/kafka/pull/15437#issuecomment-1972669980 @philipnee Could you please act as a second pair of eyes here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1508600306 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1277,8 +1280,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx // Visible for testing void maybeAutoCommitSync(final boolean shouldAutoCommit, - final Timer timer, - final AtomicReference firstException) { Review Comment: It's what the legacy consumer does. https://github.com/apache/kafka/blob/d066b94c8103cca166d7ea01a4b5bf5f65a3b838/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
mjsax commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1508477877 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1277,8 +1280,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx // Visible for testing void maybeAutoCommitSync(final boolean shouldAutoCommit, - final Timer timer, - final AtomicReference firstException) { Review Comment: > it looks like it is correct to only log the error here. Why? Should we not set `firstException` if `null` and `commitSync` throws? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15797) Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]
[ https://issues.apache.org/jira/browse/KAFKA-15797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-15797: --- Assignee: Matthias J. Sax > Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true] > -- > > Key: KAFKA-15797 > URL: https://issues.apache.org/jira/browse/KAFKA-15797 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Assignee: Matthias J. Sax >Priority: Major > Labels: flaky-test > > I found two recent failures: > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldUpgradeFromEosAlphaToEosV2_true_/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldUpgradeFromEosAlphaToEosV2_true__2/] > > Output generally looks like: > {code:java} > java.lang.AssertionError: Did not receive all 138 records from topic > multiPartitionOutputTopic within 6 ms, currently accumulated data is > [KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, > 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), > KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), > KeyValue(0, 91), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), > KeyValue(0, 91), KeyValue(0, 105), KeyValue(0, 120), KeyValue(0, 136), > KeyValue(0, 153), KeyValue(0, 171), KeyValue(0, 190), KeyValue(3, 0), > KeyValue(3, 1), KeyValue(3, 3), KeyValue(3, 6), KeyValue(3, 10), KeyValue(3, > 15), KeyValue(3, 21), KeyValue(3, 28), KeyValue(3, 36), KeyValue(3, 45), > KeyValue(3, 55), KeyValue(3, 66), KeyValue(3, 78), KeyValue(3, 91), > KeyValue(3, 105), KeyValue(3, 120), KeyValue(3, 136), KeyValue(3, 153), > KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 190), KeyValue(3, 210), > KeyValue(3, 231), KeyValue(3, 253), KeyValue(3, 276), KeyValue(3, 300), > KeyValue(3, 325), KeyValue(3, 351), KeyValue(3, 378), KeyValue(3, 406), > KeyValue(3, 435), KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, > 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), > KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), > KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105), KeyValue(1, 120), > KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), KeyValue(1, 190), > KeyValue(1, 120), KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), > KeyValue(1, 190), KeyValue(1, 210), KeyValue(1, 231), KeyValue(1, 253), > KeyValue(1, 276), KeyValue(1, 300), KeyValue(1, 325), KeyValue(1, 351), > KeyValue(1, 378), KeyValue(1, 406), KeyValue(1, 435), KeyValue(2, 0), > KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 6), KeyValue(2, 10), KeyValue(2, > 15), KeyValue(2, 21), KeyValue(2, 28), KeyValue(2, 36), KeyValue(2, 45), > KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), > KeyValue(2, 105), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), > KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), KeyValue(2, 136), > KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 210), > KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), > KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), > KeyValue(0, 210), KeyValue(0, 231), KeyValue(0, 253), KeyValue(0, 276), > KeyValue(0, 300), KeyValue(0, 325), KeyValue(0, 351), KeyValue(0, 378), > KeyValue(0, 406), KeyValue(0, 435)] Expected: is a value equal to or greater > than <138> but: <134> was less than <138>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15797: Fix flaky EOS_v2 upgrade test [kafka]
mjsax commented on PR #15449: URL: https://github.com/apache/kafka/pull/15449#issuecomment-1972488983 We have two runs of this test, one w/o error injection and one w/ error injection. The runs w/o error injections are more stable, and I did see a "verifyCommitted" that did see fail because of too much data (for keys/tasks that should not have committed), hence, I concluded that `commit.interval.ms` must have hit. This will be hard to verify on the test w/o error injection because it's pretty stable. I don't know right now, if it help for the error injection case. In fact, it could make it worse (I did not dig deep enough, but the test might actually require low TX-timeout...) -- if this is the case, we would need "hack" into `StreamsConfig` to allow us to disable the commit-interval/tx-timeout check so we can set a large commit interval and a small tx-timeout. I could not make the test fail locally, so I am hoping that a couple of Jenkins PR runs will give us some signal about how to move forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15797: Fix flaky EOS_v2 upgrade test [kafka]
mjsax opened a new pull request, #15449: URL: https://github.com/apache/kafka/pull/15449 Originally, we set commit-interval to MAX_VALUE for this test, to ensure we only commit expliclity. However, we needed to decrease it later on when adding the tx-timeout verification. We did see failing test for which commit-interval hit, resulting in failing test runs. This PR increase the commit-interval close to test-timeout to avoid commit-interval from tiggering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: Update/errors for deprecated config [kafka]
mjsax commented on code in PR #14448: URL: https://github.com/apache/kafka/pull/14448#discussion_r1508427425 ## .github/workflows/codesee-arch-diagram.yml: ## @@ -0,0 +1,23 @@ +# This workflow was added by CodeSee. Learn more at https://codesee.io/ Review Comment: I don't think this file belongs to this PR. Seems the tool you are using added it automatically... ## docs/streams/developer-guide/config-streams.html: ## @@ -91,6 +91,7 @@ rocksdb.config.setter state.dir topology.optimization + windowed.inner.class.serde Review Comment: As pointed out on the other PR. I think we should not add this config to the docs, but rather deprecate it similar to `window.size` via the KIP you started. ## docs/streams/developer-guide/config-streams.html: ## @@ -300,14 +306,20 @@ num.standby.replicasnull - default.windowed.key.serde.inner + default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.) Medium -Default serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. +<<< HEAD Review Comment: Conflict not resolve correctly. ## docs/streams/developer-guide/config-streams.html: ## @@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per partition. 1000 - cache.max.bytes.buffering + statestore.cache.max.bytes +Medium +Maximum number of memory bytes to be used for record caches across all threads. Note that at the debug level you can use cache.size to monitor the actual size of the Kafka Streams cache. +10485760 + + cache.max.bytes.buffering (Deprecated. Use cache.max.bytes instead.) Review Comment: even/odd issue... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dongnuo123 commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1508436563 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3501,42 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); +return group != null && group.type() == CLASSIC && group.isEmpty(); +} + +/** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ +private boolean validateOfflineDowngrade(String groupId) { +Group group = groups.get(groupId); +return group != null && group.type() == CONSUMER && group.isEmpty(); +} + +/** + * Upgrade/Downgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ +public void maybeMigrateEmptyGroup(String groupId, List records, boolean isUpgrade) { +if ((isUpgrade && validateOfflineUpgrade(groupId)) || +(!isUpgrade && validateOfflineDowngrade(groupId))) { +deleteGroup(groupId, records); +removeGroup(groupId); Review Comment: There is a bug for downgrade. The sequence of downgrading now is 1) consumer group gets deleted 2) classicGroupJoin creates a classic group and put it into `groupMetadataManager.groups`3) replaying consumer group tombstone from `deleteGroup` where `groups.get(groupId)` should be a consumer group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]
mjsax commented on code in PR #14360: URL: https://github.com/apache/kafka/pull/14360#discussion_r1508421651 ## docs/streams/developer-guide/config-streams.html: ## @@ -300,8 +306,10 @@ num.standby.replicasnull - default.windowed.key.serde.inner + default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.) Medium +<<< HEAD +<<< HEAD Review Comment: Seems you missed to delete some marker lines from resolving conflict during rebasing. (more below) ## docs/streams/developer-guide/config-streams.html: ## @@ -1010,6 +1016,18 @@ topology.optimization + windowed.inner.class.serde + + + +Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface. + + +Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client. Review Comment: I just did some more digging, and now I actually think that @ableegoldman is right, we might want to treat `windowed.inner.serde.class` similar to `window.size`... (ie, maybe remove from StreamsConfig -- we could add this to the KIP Lucia started). I also understand now, why the docs says, using it would result in an error (for both configs): Kafka Streams will always pass window-size and inner-serde via the _constructor_ and we will also verify that we don't get an parameter set twice (or zero time), and throw an error inside `configure()` method of the windowed serdes... Thus, we might want to not add `windowed.inner.serde.class` to the docs in this PR to begin with... Sorry for the back and forth. Reading and understanding code is hard... ## docs/streams/developer-guide/config-streams.html: ## @@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per partition. 1000 - cache.max.bytes.buffering + statestore.cache.max.bytes +Medium +Maximum number of memory bytes to be used for record caches across all threads. Note that at the debug level you can use cache.size to monitor the actual size of the Kafka Streams cache. Review Comment: > Note that at the debug level you can use cache.size to monitor the actual size of the Kafka Streams cache. What does this mean? Cannot follow. ## docs/streams/developer-guide/config-streams.html: ## @@ -326,6 +334,18 @@ num.standby.replicasDefault serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. Deprecated. +=== +Default serializer/deserializer for the inner class of windowed keys, implementing the Serde interface. Review Comment: Duplicate line (both are not 100% the same) -- seems a conflict was not resolve correctly. ## docs/streams/developer-guide/config-streams.html: ## @@ -257,7 +258,12 @@ num.standby.replicasThe maximum number of records to buffer per partition. 1000 - cache.max.bytes.buffering + statestore.cache.max.bytes +Medium +Maximum number of memory bytes to be used for record caches across all threads. Note that at the debug level you can use cache.size to monitor the actual size of the Kafka Streams cache. +10485760 + + cache.max.bytes.buffering (Deprecated. Use cache.max.bytes instead.) Review Comment: If we insert a new row, we need to change "even / odd" for all rows below... super annoying... (otherwise we get two rows with the same background color instead of nicely interleaved rows) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.7 to Kafka Streams system tests [kafka]
mjsax commented on PR #15443: URL: https://github.com/apache/kafka/pull/15443#issuecomment-1972305073 Couple of system test failed, but seems with env issue. Re-running: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6081/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16317: Add event process rate metric in GroupCoordinatorRuntimeMetrics [kafka]
jeffkbkim opened a new pull request, #15448: URL: https://github.com/apache/kafka/pull/15448 This metric will help identify how fast all coordinator threads process events. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16316) Make the restore behavior of GlobalKTables with custom processors configureable
[ https://issues.apache.org/jira/browse/KAFKA-16316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16316: Labels: needs-kip (was: ) > Make the restore behavior of GlobalKTables with custom processors > configureable > --- > > Key: KAFKA-16316 > URL: https://issues.apache.org/jira/browse/KAFKA-16316 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > Labels: needs-kip > > Take the change implemented in > https://issues.apache.org/jira/browse/KAFKA-7663 and make it optional through > adding a couple methods to the API -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16317) Add event rate in GroupCoordinatorRuntimeMetrics
Jeff Kim created KAFKA-16317: Summary: Add event rate in GroupCoordinatorRuntimeMetrics Key: KAFKA-16317 URL: https://issues.apache.org/jira/browse/KAFKA-16317 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim Assignee: Jeff Kim We want a sensor to record every time we process a new event in the coordinator runtime. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: move TimeRatio to clients common [kafka]
jeffkbkim opened a new pull request, #15447: URL: https://github.com/apache/kafka/pull/15447 Move TimeRatio to `org.apache.kafka.common.metrics.stats` package ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16316) Make the restore behavior of GlobalKTables with custom processors configureable
Walker Carlson created KAFKA-16316: -- Summary: Make the restore behavior of GlobalKTables with custom processors configureable Key: KAFKA-16316 URL: https://issues.apache.org/jira/browse/KAFKA-16316 Project: Kafka Issue Type: Improvement Components: streams Reporter: Walker Carlson Assignee: Walker Carlson Take the change implemented in https://issues.apache.org/jira/browse/KAFKA-7663 and make it optional through adding a couple methods to the API -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1506982605 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +@Override +public void describeTopics( +TopicCollection topics, +DescribeTopicsOptions options, +AdminResultsSubscriber subscriber) { +if (topics instanceof TopicIdCollection) { +subscriber.onError( +new IllegalArgumentException("Currently the describeTopics subscription mode does not support topic IDs.") +); +return; +} +if (!(topics instanceof TopicNameCollection)) { +subscriber.onError( +new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.") +); +return; +} + +TreeSet topicNames = new TreeSet<>(); +((TopicNameCollection) topics).topicNames().forEach(topicName -> { +if (topicNameIsUnrepresentable(topicName)) { + subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request."))); +} else { +topicNames.add(topicName); +} +}); + +RecurringCall call = new RecurringCall( Review Comment: I'm still not sure why we need the RecurringCall, I think something like this should be much less code: ``` ArrayBlockingQueue results = new ArrayBlockingQueue<>(5); Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { DescribeTopicPartitionsRequestData.Cursor currentCursor = new DescribeTopicPartitionsRequestData.Cursor(); // <...> @Override void handleResponse(AbstractResponse abstractResponse) { // ... Do the needful ... results.put(...); // ... if (hasMore) { // ... Set new cursor ... // ... runnable.call(this, now); } else { results.put(null); } } // <...> } runnable.call(call, time.milliseconds()); while (true) { DescribeTopicPartitionsResult result = results.take(); if (result == null) break; subscriber.onNext(result); } ``` And we won't need to create extra threads on the TopicCommand and etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jeffkbkim commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1508185648 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; Review Comment: and if the request is not full then we don't send a full response (i.e. omit the assignment)? will you be fixing it in a different PR and is there a jira to track it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jeffkbkim commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1508179840 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1193,71 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. Review Comment: Can we add a bit more description on what "reconcile" means here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]
jeffkbkim commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1508174554 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1211,13 +1192,99 @@ private CoordinatorResult consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. -if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { +if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } +/** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param memberThe member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ +private ConsumerGroupMember maybeReconcile( +String groupId, +ConsumerGroupMember member, +BiFunction currentPartitionEpoch, +int targetAssignmentEpoch, +Assignment targetAssignment, +List ownedTopicPartitions, +List records +) { +if (member.isReconciledTo(targetAssignmentEpoch)) { +return member; +} + +ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) +.withTargetAssignment(targetAssignmentEpoch, targetAssignment) +.withCurrentPartitionEpoch(currentPartitionEpoch) +.withOwnedTopicPartitions(ownedTopicPartitions) +.build(); + +if (!updatedMember.equals(member)) { +records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + +log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", +groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), +formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + +if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { +scheduleConsumerGroupRebalanceTimeout( +groupId, +updatedMember.memberId(), +updatedMember.memberEpoch(), +updatedMember.rebalanceTimeoutMs() +); +} else { Review Comment: i'm confused why we changed the name from revocation to rebalance timeout if it's actually the revocation timeout and a "rebalance" is considered complete only when we move to STABLE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 [WIP] ConfigCommand moved to tools [kafka]
nizhikov commented on PR #15417: URL: https://github.com/apache/kafka/pull/15417#issuecomment-1971882476 Hello @mimaison @jolshan It seems like I done with rewriting `ConfigCommand` code to java. It can't be moved to `tools` right now, because of dependencies on `KafkaConfig` and other parts of `core`, but it passes all tests. Can you, please, help me with moving dependencies of `ConfigCommand` to the correct modules. PR's is here - #15075 #15387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1508102432 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -892,6 +893,24 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } +@Test +public void testNoWakeupInCloseCommit() { Review Comment: Mockito is weird sometimes! Hopefully fixed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16190) Member should send full heartbeat when rejoining
[ https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822253#comment-17822253 ] Quoc Phong Dang commented on KAFKA-16190: - [~lianetm] I'm not sure how the extension of this test is supposed to work. When the consumer changes the subscription when it gets fenced, how can it keep the same initial one? Shouldn't it be the new one? > Member should send full heartbeat when rejoining > > > Key: KAFKA-16190 > URL: https://issues.apache.org/jira/browse/KAFKA-16190 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Quoc Phong Dang >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support, newbie > Fix For: 3.8.0 > > > The heartbeat request builder should make sure that all fields are sent in > the heartbeat request when the consumer rejoins (currently the > HeartbeatRequestManager request builder is reset on failure scenarios, which > should cover the fence+rejoin sequence). > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses > this exact case given that it does explicitly change the subscription when it > gets fenced. We should ensure we test a consumer that keeps it same initial > subscription when it rejoins after being fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
cadonna commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1507973505 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -892,6 +893,24 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } +@Test +public void testNoWakeupInCloseCommit() { Review Comment: The error I get is: ```java org.mockito.exceptions.verification.TooManyActualInvocations: applicationEventHandler.add( ); Wanted 1 time: -> at org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.add(ApplicationEventHandler.java:72) But was 2 times: -> at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.commit(AsyncKafkaConsumer.java:807) -> at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.prepareShutdown(AsyncKafkaConsumer.java:1272) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822250#comment-17822250 ] Bruno Cadonna commented on KAFKA-16290: --- I changed the version to 4.0.0. > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Critical > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 4.0.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-16290: -- Fix Version/s: 4.0.0 (was: 3.8.0) > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Critical > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 4.0.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16315) Investigate propagating metadata updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16315: - Assignee: Kirk True > Investigate propagating metadata updates via queues > --- > > Key: KAFKA-16315 > URL: https://issues.apache.org/jira/browse/KAFKA-16315 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network > I/O thread then issues a call to update the {{ConsumerMetadata}} via > {{requestUpdate()}}. If the event ends up stuck in the queue for a long time, > it is possible that the metadata is not updated at the correct time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1971644323 @kirktrue are you fine with merging this PR and coming back to this after 3.8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822246#comment-17822246 ] Kirk True commented on KAFKA-16290: --- I just filed KAFKA-16315 which has a similar feel to this design issue as we mutate the {{ConsumerMetadata}} in the application thread and background thread. > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Critical > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16290: -- Labels: consumer-threading-refactor kip-848 kip-848-client-support (was: kip-848) > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Critical > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16315) Investigate propagating metadata updates via queues
Kirk True created KAFKA-16315: - Summary: Investigate propagating metadata updates via queues Key: KAFKA-16315 URL: https://issues.apache.org/jira/browse/KAFKA-16315 Project: Kafka Issue Type: Task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Fix For: 4.0.0 Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network I/O thread then issues a call to update the {{ConsumerMetadata}} via {{requestUpdate()}}. If the event ends up stuck in the queue for a long time, it is possible that the metadata is not updated at the correct time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822245#comment-17822245 ] Kirk True commented on KAFKA-16290: --- [~cadonna]—do we feel we have investigated this sufficiently? Are we making any changes in 3.8.0, or should should we move this to 4.0.0 or just remove the version altogether? Thanks! > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Kirk True >Priority: Critical > Labels: kip-848 > Fix For: 3.8.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16290: - Assignee: Bruno Cadonna (was: Kirk True) > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Critical > Labels: kip-848 > Fix For: 3.8.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
jeffkbkim commented on code in PR #15446: URL: https://github.com/apache/kafka/pull/15446#discussion_r1507948503 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1830,6 +1832,64 @@ public void onLoaded() { }); } +// TODO: this part is done asynchronously in the old coordinator via a KafkaScheduler#schedule +/** Review Comment: would like to point out that the existing coordinator does this asynchronously -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
jeffkbkim opened a new pull request, #15446: URL: https://github.com/apache/kafka/pull/15446 In the new coordinator, we have lingering timers (heartbeats/revocation timeouts/join groups/sync groups). For classic groups, we also have awaiting join/sync futures that are never completed. This patch cancels all existing timers and completes all awaiting futures when a group is unloaded. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]
Vaibhav-Nazare commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1971535782 @mimaison any updates? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1507865068 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -97,6 +101,10 @@ public void setFetchAction(final FetchBuffer fetchBuffer) { } } +public void disableWakeups() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1507852098 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -892,6 +893,24 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } +@Test +public void testNoWakeupInCloseCommit() { Review Comment: Could you provide some more details? It passed for me 5000 times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
cadonna commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1507827920 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -97,6 +101,10 @@ public void setFetchAction(final FetchBuffer fetchBuffer) { } } +public void disableWakeups() { Review Comment: Could you please add unit tests for this new method? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -892,6 +893,24 @@ public void testWakeupCommitted() { assertNull(consumer.wakeupTrigger().getPendingTask()); } +@Test +public void testNoWakeupInCloseCommit() { Review Comment: This unit test does not work for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1507837970 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -474,6 +474,8 @@ public CoordinatorResult commitOffset( final List records = new ArrayList<>(); final long currentTimeMs = time.milliseconds(); final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); +groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, true); +final int initialRecordsSize = records.size(); Review Comment: My question was more about the `maybeUpgradeEmptyGroup` called here. Why do we need it? This seems to be orthogonal to the offset metadata manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dongnuo123 commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1507826893 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); + +if (group == null || group.type() == CONSUMER) { +return false; +} + +ClassicGroup classicGroup = (ClassicGroup) group; +if (!classicGroup.isEmpty()) { +return false; +} else { +return true; +} +} + +/** + * Upgrade the empty classic group to a consumer group if it's valid. + * + * @param groupId The group id to be updated. + * @param records The list of records to delete the classic group and create the consumer group. + * @param isSimpleGroup The boolean indicating whether the group to be updated is a simple group. + */ +public void maybeUpgradeEmptyGroup(String groupId, List records, boolean isSimpleGroup) { Review Comment: I agree with it in the case of upgrading if a new member joining the empty group, but for the simple group we still need to create the new group. Is that correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dongnuo123 commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1507823457 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -474,6 +474,8 @@ public CoordinatorResult commitOffset( final List records = new ArrayList<>(); final long currentTimeMs = time.milliseconds(); final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); +groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, true); +final int initialRecordsSize = records.size(); Review Comment: This is because there is a `if (!records.isEmpty())` in L513. If any new records are added we need ` if (records.size() > initialRecordsSize)` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru commented on code in PR #15445: URL: https://github.com/apache/kafka/pull/15445#discussion_r1507815350 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1277,8 +1280,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx // Visible for testing void maybeAutoCommitSync(final boolean shouldAutoCommit, - final Timer timer, - final AtomicReference firstException) { Review Comment: this parameter wasn't used, and it looks like it is correct to only log the error here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru commented on PR #15445: URL: https://github.com/apache/kafka/pull/15445#issuecomment-1971423281 @cadonna Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16167: Disable wakeups during autocommit on close [kafka]
lucasbru opened a new pull request, #15445: URL: https://github.com/apache/kafka/pull/15445 When the consumer is closed, we perform a sychronous autocommit. We don't want to be woken up here, because we are already executing a close operation under a deadline. This is in line with the behavior of the old consumer. This fixes PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup which is flaky on trunk - because we return immediately from the synchronous commit with a WakeupException, which causes us to not wait for the commit to finish and thereby sometimes miss the committed offset when a new consumer is created. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only
[ https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822181#comment-17822181 ] zhangzhisheng commented on KAFKA-3370: -- https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms > Add options to auto.offset.reset to reset offsets upon initialization only > -- > > Key: KAFKA-3370 > URL: https://issues.apache.org/jira/browse/KAFKA-3370 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > > Currently "auto.offset.reset" is applied in the following two cases: > 1) upon starting the consumer for the first time (hence no committed offsets > before); > 2) upon fetching offsets out-of-range. > For scenarios where case 2) needs to be avoid (i.e. people need to be > notified upon offsets out-of-range rather than silently offset reset), > "auto.offset.reset" need to be set to "none". However for case 1) setting > "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon > polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied > trying to set the offset at initialization, which are actually designed for > during the life time of the consumer (in rebalance callback, for example). > The fix proposal is to add two more options to "auto.offset.reset", > "earliest-on-start", and "latest-on-start", whose semantics are "earliest" > and "latest" for case 1) only, and "none" for case 2). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]
AyoubOm commented on code in PR #15361: URL: https://github.com/apache/kafka/pull/15361#discussion_r1507760037 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1664,18 +1664,17 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes(); stateManager.checkpoint(); -EasyMock.expectLastCall().once(); +EasyMock.expectLastCall().times(2); EasyMock.expect(stateManager.changelogOffsets()) .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint .andReturn(singletonMap(changelogPartition, 10L)) .andReturn(singletonMap(changelogPartition, 20L)); -EasyMock.expectLastCall(); EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded(); -task.completeRestoration(noOpResetter -> { }); +task.completeRestoration(noOpResetter -> { }); // should checkpoint Review Comment: Done. Maybe I will create a minor separate PR for that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]
dajac merged PR #15432: URL: https://github.com/apache/kafka/pull/15432 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]
lucasbru commented on code in PR #15440: URL: https://github.com/apache/kafka/pull/15440#discussion_r1507649930 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() { testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); } +@Test +public void testLeaveGroupWhenAssignmentEmpty() { Review Comment: Replaced it by a simpler test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]
lucasbru commented on code in PR #15440: URL: https://github.com/apache/kafka/pull/15440#discussion_r1507649418 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() { testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); } +@Test +public void testLeaveGroupWhenAssignmentEmpty() { +String topicName = "topic1"; +TopicPartition ownedPartition = new TopicPartition(topicName, 0); +MembershipManager membershipManager = createMemberInStableState(); + +// We own a partition and rebalance listener is registered + when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition)); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(mock(ConsumerRebalanceListener.class))); +doNothing().when(subscriptionState).markPendingRevocation(anySet()); + +// Trigger leave group +CompletableFuture leaveResult1 = membershipManager.leaveGroup(); + +// Rebalance listener not yet triggered +final ArgumentCaptor consumerRebalanceListener = + ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackNeededEvent.class); + verify(backgroundEventHandler).add(consumerRebalanceListener.capture()); +final ConsumerRebalanceListenerCallbackNeededEvent callbackEvent = consumerRebalanceListener.getValue(); +assertFalse(leaveResult1.isDone()); +assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state()); + +// Clear the assignment + when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet()); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false); + +// Complete the callback +callbackEvent.future().complete(null); + +// Completed the listener Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]
cadonna commented on code in PR #15361: URL: https://github.com/apache/kafka/pull/15361#discussion_r1507618245 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1664,18 +1664,17 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes(); stateManager.checkpoint(); -EasyMock.expectLastCall().once(); +EasyMock.expectLastCall().times(2); EasyMock.expect(stateManager.changelogOffsets()) .andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint .andReturn(singletonMap(changelogPartition, 10L)) .andReturn(singletonMap(changelogPartition, 20L)); -EasyMock.expectLastCall(); EasyMock.replay(stateManager, recordCollector); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded(); -task.completeRestoration(noOpResetter -> { }); +task.completeRestoration(noOpResetter -> { }); // should checkpoint Review Comment: These changes do not seem necessary. Could you please revert them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]
cadonna commented on code in PR #15440: URL: https://github.com/apache/kafka/pull/15440#discussion_r1507610459 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() { testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); } +@Test +public void testLeaveGroupWhenAssignmentEmpty() { +String topicName = "topic1"; +TopicPartition ownedPartition = new TopicPartition(topicName, 0); +MembershipManager membershipManager = createMemberInStableState(); + +// We own a partition and rebalance listener is registered + when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition)); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(mock(ConsumerRebalanceListener.class))); +doNothing().when(subscriptionState).markPendingRevocation(anySet()); + +// Trigger leave group +CompletableFuture leaveResult1 = membershipManager.leaveGroup(); + +// Rebalance listener not yet triggered +final ArgumentCaptor consumerRebalanceListener = + ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackNeededEvent.class); + verify(backgroundEventHandler).add(consumerRebalanceListener.capture()); +final ConsumerRebalanceListenerCallbackNeededEvent callbackEvent = consumerRebalanceListener.getValue(); +assertFalse(leaveResult1.isDone()); +assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state()); + +// Clear the assignment + when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet()); +when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false); + +// Complete the callback +callbackEvent.future().complete(null); + +// Completed the listener Review Comment: This comment does not make too much sense since the leave future is still verified to be not completed. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -764,6 +766,44 @@ public void testLeaveGroupWhenMemberOwnsAssignment() { testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); } +@Test +public void testLeaveGroupWhenAssignmentEmpty() { Review Comment: Is there maybe a more direct way to test the bug? I had a hard time to understand this unit test. Nevermind if there is no more direct way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16314) Add the new ABORTABLE_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-16314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangzhisheng updated KAFKA-16314: -- Attachment: image-2024-02-29-21-43-51-000.png > Add the new ABORTABLE_ERROR > --- > > Key: KAFKA-16314 > URL: https://issues.apache.org/jira/browse/KAFKA-16314 > Project: Kafka > Issue Type: Sub-task >Reporter: Sanskar Jhajharia >Assignee: Sanskar Jhajharia >Priority: Major > Attachments: image-2024-02-29-21-43-51-000.png > > > As mentioned in the KIP, we would bump the ProduceRequest and ProduceResponse > to indicate that the server now returns a new ABORTABLE_ERROR. This error > would essentially require the client to abort the current transaction and > continue (without a need to restart the client). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16314) Add the new ABORTABLE_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-16314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822138#comment-17822138 ] zhangzhisheng commented on KAFKA-16314: --- I think this ability is needed > Add the new ABORTABLE_ERROR > --- > > Key: KAFKA-16314 > URL: https://issues.apache.org/jira/browse/KAFKA-16314 > Project: Kafka > Issue Type: Sub-task >Reporter: Sanskar Jhajharia >Assignee: Sanskar Jhajharia >Priority: Major > Attachments: image-2024-02-29-21-43-51-000.png > > > As mentioned in the KIP, we would bump the ProduceRequest and ProduceResponse > to indicate that the server now returns a new ABORTABLE_ERROR. This error > would essentially require the client to abort the current transaction and > continue (without a need to restart the client). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12261) Splitting partition causes message loss for consumers with auto.offset.reset=latest
[ https://issues.apache.org/jira/browse/KAFKA-12261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822135#comment-17822135 ] zhangzhisheng commented on KAFKA-12261: --- It is like this, but in my understanding, this is normal. > Splitting partition causes message loss for consumers with > auto.offset.reset=latest > --- > > Key: KAFKA-12261 > URL: https://issues.apache.org/jira/browse/KAFKA-12261 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > Fix For: 3.6.0 > > > As of now, auto.offset.reset of ConsumerConfig is "latest" by default. > > This could be a pitfall that causes message delivery loss when we split > topic's partitions like below: > Say we have a topic-X which have only 1 partition. > # split topic-X to 2 partitions by kafka-topics.sh --alter --topic topic-X > --partitions 2 (topic-X-1 is added) > # producer knows that new partitions are added by refreshing metadata. > starts to produce to topic-X-1 > # bit later, consumer knows that new partitions are added and triggering > consumer rebalance, then starts consuming topic-X-1 > * > ** upon starting consumption, it resets its offset to log-end-offset > If the producer sent several records before 3, they could be not-delivered to > the consumer. > > > This behavior isn't preferable in most cases, so it should be documented in > AUTO_OFFSET_RESET_DOC at least. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
chiacyu commented on PR #15444: URL: https://github.com/apache/kafka/pull/15444#issuecomment-1971140347 Hi, @showuon Please help to review this PR. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
chiacyu opened a new pull request, #15444: URL: https://github.com/apache/kafka/pull/15444 Hi, all Change the function with a better way to deal with the NULL pointer exception. Thanks all! ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]
gaurav-narula commented on code in PR #15289: URL: https://github.com/apache/kafka/pull/15289#discussion_r1507510343 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -137,7 +137,18 @@ object TestUtils extends Logging { val parentFile = new File(parent) parentFile.mkdirs() -JTestUtils.tempDirectory(parentFile.toPath, null) +val result = JTestUtils.tempDirectory(parentFile.toPath, null) + +parentFile.deleteOnExit() +Exit.addShutdownHook("delete-temp-log-dir-on-exit", { + try { +Utils.delete(parentFile) Review Comment: Good point - turns we can just use `parentFile.deleteOnExit()`. Updated in [394f2e3](https://github.com/apache/kafka/pull/15289/commits/394f2e30e3ac2110921f8e1ea2978c405194a11b) ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -137,7 +137,18 @@ object TestUtils extends Logging { val parentFile = new File(parent) parentFile.mkdirs() -JTestUtils.tempDirectory(parentFile.toPath, null) +val result = JTestUtils.tempDirectory(parentFile.toPath, null) + +parentFile.deleteOnExit() +Exit.addShutdownHook("delete-temp-log-dir-on-exit", { + try { +Utils.delete(parentFile) Review Comment: Good point - turns out we can just use `parentFile.deleteOnExit()`. Updated in [394f2e3](https://github.com/apache/kafka/pull/15289/commits/394f2e30e3ac2110921f8e1ea2978c405194a11b) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]
AyoubOm commented on PR #15361: URL: https://github.com/apache/kafka/pull/15361#issuecomment-1971038693 > @AyoubOm Thanks for the updates! > > Here my replies. Thanks @cadonna, I made updates accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [3/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on PR #15365: URL: https://github.com/apache/kafka/pull/15365#issuecomment-1971031598 Hello, @rreddy-22 @dajac Can you, please, take a look at these changes? It PR moves test of `ConsoleGroupCommand` from Authorizer test and Sasl test to `tools`. It very small (+200, -100). Big PR where all command code rewritten in java - https://github.com/apache/kafka/pull/14471 Right now, changes in ConsoleGroupCommand leads to conflicts in my PR which is hard to resolve. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [2/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on PR #15363: URL: https://github.com/apache/kafka/pull/15363#issuecomment-1971028532 Hello, @rreddy-22 @dajac Can you, please, take a look at these changes? It PR moves `DescribeConsumerGroupTest` and `ResetConsumerGroupOffsetTest` as part of moving `ConsoleGroupCommand` to `tools`. Having it in trunk will reduce your work while improving `ConsoleGroupCommand` (no need to duplicate test changes in java and scala) and help moving command to `tools`. Big PR where all command code rewritten in java - #14471 Right now, changes in `ConsoleGroupCommand` leads to conflicts in my PR which is hard to resolve. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16190: Member should send full heartbeat when rejoining [kafka]
lucasbru commented on code in PR #15401: URL: https://github.com/apache/kafka/pull/15401#discussion_r1507409271 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -524,10 +524,10 @@ public void testHeartbeatState() { assertEquals(memberId, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); -assertEquals(-1, data.rebalanceTimeoutMs()); Review Comment: The ticket asks for an extension of this test: > Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses this exact case given that it does explicitly change the subscription when it gets fenced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16190: Member should send full heartbeat when rejoining [kafka]
lucasbru commented on code in PR #15401: URL: https://github.com/apache/kafka/pull/15401#discussion_r1507409979 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -545,6 +545,15 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { // MemberEpoch - always sent data.setMemberEpoch(membershipManager.memberEpoch()); +// Sent all fields if the member is joining/rejoining the group Review Comment: nit: "Send" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16261: updateSubscription fails if already empty subscription [kafka]
cadonna commented on PR #15440: URL: https://github.com/apache/kafka/pull/15440#issuecomment-1970897046 @lucasbru Unit tests are failing in `MembershipManagerImplTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config [kafka]
lucasbru commented on code in PR #15330: URL: https://github.com/apache/kafka/pull/15330#discussion_r1507381845 ## tests/kafkatest/tests/verifiable_consumer_test.py: ## @@ -72,16 +86,59 @@ def await_produced_messages(self, producer, min_messages=1000, timeout_sec=10): err_msg="Timeout awaiting messages to be produced and acked") def await_consumed_messages(self, consumer, min_messages=1): +timeout_sec = self.consumption_timeout_sec current_total = consumer.total_consumed() -wait_until(lambda: consumer.total_consumed() >= current_total + min_messages, - timeout_sec=self.consumption_timeout_sec, - err_msg="Timed out waiting for consumption") +expected = current_total + min_messages + +def _condition(): +return consumer.total_consumed() >= expected + +def _err_msg(): +actual = consumer.total_consumed() +return "%d messages received within the timeout of %d seconds, expected %d" % (actual, timeout_sec, expected) + +wait_until(lambda: _condition(), timeout_sec=timeout_sec, err_msg=_err_msg()) + +def await_members_stopped(self, consumer, num_consumers, timeout_sec): +self._await_members_in_state(consumer, num_consumers, "stopped", [ConsumerState.Dead], timeout_sec) def await_members(self, consumer, num_consumers): # Wait until all members have joined the group -wait_until(lambda: len(consumer.joined_nodes()) == num_consumers, - timeout_sec=self.session_timeout_sec*2, - err_msg="Consumers failed to join in a reasonable amount of time") - +states = [ConsumerState.Joined] + +if consumer_group.is_consumer_group_protocol_enabled(consumer.group_protocol): +states.extend([ConsumerState.Started, ConsumerState.Rebalancing]) Review Comment: Why do we have to do this? Are we still checking the same thing if we consider "started" as "joined"? ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -201,7 +200,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quor bounce_mode=["all", "rolling"], num_bounces=[5], metadata_quorum=[quorum.isolated_kraft], -use_new_coordinator=[True, False] +use_new_coordinator=[True] Review Comment: Why no group protocol parameter here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna merged PR #15438: URL: https://github.com/apache/kafka/pull/15438 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna commented on PR #15438: URL: https://github.com/apache/kafka/pull/15438#issuecomment-1970829159 The test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna commented on code in PR #15438: URL: https://github.com/apache/kafka/pull/15438#discussion_r1507345366 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -32,36 +36,42 @@ public enum Type { private final Type type; +/** + * This identifies a particular event. It is used to disambiguate events via {@link #hashCode()} and + * {@link #equals(Object)} and can be used in log messages when debugging. + */ +private final Uuid id; Review Comment: In any case, we can add the counter or sequence number later. It does not have to block this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna commented on code in PR #15438: URL: https://github.com/apache/kafka/pull/15438#discussion_r1507320914 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -32,36 +36,42 @@ public enum Type { private final Type type; +/** + * This identifies a particular event. It is used to disambiguate events via {@link #hashCode()} and + * {@link #equals(Object)} and can be used in log messages when debugging. + */ +private final Uuid id; + protected ApplicationEvent(Type type) { this.type = Objects.requireNonNull(type); +this.id = Uuid.randomUuid(); } public Type type() { return type; } -@Override -public boolean equals(Object o) { -if (this == o) return true; -if (o == null || getClass() != o.getClass()) return false; - -ApplicationEvent that = (ApplicationEvent) o; +public Uuid id() { +return id; +} -return type == that.type; +@Override +public final boolean equals(Object o) { +return this == o; } @Override -public int hashCode() { -return type.hashCode(); +public final int hashCode() { +return Objects.hash(type, id); } protected String toStringBase() { -return "type=" + type; +return "type=" + type + ", id=" + id; Review Comment: I was more on the performance reason side, but I also assumed that the performance benefits are negligible. I proposed it more to be on the safe side, thus the "nit" prefix. I am fine either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna commented on code in PR #15438: URL: https://github.com/apache/kafka/pull/15438#discussion_r1507320914 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -32,36 +36,42 @@ public enum Type { private final Type type; +/** + * This identifies a particular event. It is used to disambiguate events via {@link #hashCode()} and + * {@link #equals(Object)} and can be used in log messages when debugging. + */ +private final Uuid id; + protected ApplicationEvent(Type type) { this.type = Objects.requireNonNull(type); +this.id = Uuid.randomUuid(); } public Type type() { return type; } -@Override -public boolean equals(Object o) { -if (this == o) return true; -if (o == null || getClass() != o.getClass()) return false; - -ApplicationEvent that = (ApplicationEvent) o; +public Uuid id() { +return id; +} -return type == that.type; +@Override +public final boolean equals(Object o) { +return this == o; } @Override -public int hashCode() { -return type.hashCode(); +public final int hashCode() { +return Objects.hash(type, id); } protected String toStringBase() { -return "type=" + type; +return "type=" + type + ", id=" + id; Review Comment: I was more on the performance reason side, but I also assumed that the performance benefits are negligible. I proposed it more to be on the safe side, thus the "nit" prefix. I am fine either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna commented on code in PR #15438: URL: https://github.com/apache/kafka/pull/15438#discussion_r1507316466 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -32,36 +36,42 @@ public enum Type { private final Type type; +/** + * This identifies a particular event. It is used to disambiguate events via {@link #hashCode()} and + * {@link #equals(Object)} and can be used in log messages when debugging. + */ +private final Uuid id; Review Comment: > I do like the idea of being able to determine the ordering of the events, though. Can we maybe have both -- globally unique IDs and a counter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16191: Clean up of consumer client internal events [kafka]
cadonna commented on code in PR #15438: URL: https://github.com/apache/kafka/pull/15438#discussion_r1507315536 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -32,36 +36,42 @@ public enum Type { private final Type type; +/** + * This identifies a particular event. It is used to disambiguate events via {@link #hashCode()} and + * {@link #equals(Object)} and can be used in log messages when debugging. + */ +private final Uuid id; Review Comment: > I wanted the IDs to be "globally unique", which isn't going to happen if the client logs include multiple consumers. Perhaps that's an over-optimization? I hope the log context will tell you from what consumer the events come from. However, I also agree that searching for globally unique IDs in logs is easier than searching for counters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1507296782 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); + +if (group == null || group.type() == CONSUMER) { +return false; +} + +ClassicGroup classicGroup = (ClassicGroup) group; +if (!classicGroup.isEmpty()) { +return false; +} else { +return true; +} +} + +/** + * Upgrade the empty classic group to a consumer group if it's valid. + * + * @param groupId The group id to be updated. + * @param records The list of records to delete the classic group and create the consumer group. + * @param isSimpleGroup The boolean indicating whether the group to be updated is a simple group. + */ +public void maybeUpgradeEmptyGroup(String groupId, List records, boolean isSimpleGroup) { Review Comment: In this particular case where the group is empty and a new member joins, my understanding is that we only need to delete the previous group. The new group will be automatically created with the new member. Am I missing something? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); + +if (group == null || group.type() == CONSUMER) { +return false; +} + +ClassicGroup classicGroup = (ClassicGroup) group; +if (!classicGroup.isEmpty()) { +return false; +} else { +return true; +} +} + +/** + * Upgrade the empty classic group to a consumer group if it's valid. + * + * @param groupId The group id to be updated. + * @param records The list of records to delete the classic group and create the consumer group. + * @param isSimpleGroup The boolean indicating whether the group to be updated is a simple group. + */ +public void maybeUpgradeEmptyGroup(String groupId, List records, boolean isSimpleGroup) { +if (validateOfflineUpgrade(groupId)) { +final long currentTimeMs = time.milliseconds(); +ClassicGroup classicGroup = getOrMaybeCreateClassicGroup(groupId, false); +int groupEpoch = classicGroup.generationId(); + +// Replace the classic group with a new consumer group. +ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true); +// We don't create the tombstone because the replay will remove the newly created consumer group. Review Comment: I think that we must write a tombstone for the old group. As you said, the map will be updated based on the records with the new group. However, we also need to compact the old record for the group and the only way to do it is to write a tombstone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16313: offline group protocol upgrade [kafka]
dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1507302228 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -474,6 +474,8 @@ public CoordinatorResult commitOffset( final List records = new ArrayList<>(); final long currentTimeMs = time.milliseconds(); final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); +groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, true); +final int initialRecordsSize = records.size(); Review Comment: Why are we doing this here? I am not sure to follow. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, List records) { } } +/** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ +private boolean validateOfflineUpgrade(String groupId) { +Group group = groups.get(groupId); + +if (group == null || group.type() == CONSUMER) { +return false; +} + +ClassicGroup classicGroup = (ClassicGroup) group; +if (!classicGroup.isEmpty()) { +return false; +} else { +return true; +} +} + +/** + * Upgrade the empty classic group to a consumer group if it's valid. + * + * @param groupId The group id to be updated. + * @param records The list of records to delete the classic group and create the consumer group. + * @param isSimpleGroup The boolean indicating whether the group to be updated is a simple group. + */ +public void maybeUpgradeEmptyGroup(String groupId, List records, boolean isSimpleGroup) { +if (validateOfflineUpgrade(groupId)) { +final long currentTimeMs = time.milliseconds(); +ClassicGroup classicGroup = getOrMaybeCreateClassicGroup(groupId, false); +int groupEpoch = classicGroup.generationId(); + +// Replace the classic group with a new consumer group. +ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true); +// We don't create the tombstone because the replay will remove the newly created consumer group. +removeGroup(groupId); +groups.put(groupId, consumerGroup); +metrics.onConsumerGroupStateTransition(null, consumerGroup.state()); + +if (!isSimpleGroup) { +records.add(newGroupSubscriptionMetadataRecord( +groupId, + consumerGroup.computeSubscriptionMetadata(classicGroup.subscribedTopics(), metadataImage.topics(), metadataImage.cluster()) +)); Review Comment: This does not seem necessary. If the group is empty, the subscribedTopics will also be empty. As I said earlier, we can let the handling of the new member create the new subscription. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]
dajac commented on code in PR #15432: URL: https://github.com/apache/kafka/pull/15432#discussion_r1507247226 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1695,8 +1683,8 @@ public void scheduleUnloadOperation( scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> { CoordinatorContext context = coordinators.get(tp); if (context != null) { +context.lock.lock(); Review Comment: I moved it for consistency but it also seems better to lock before entering the try..catch because unlocking if lock fails would be wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix UpdatedImage and HighWatermarkUpdated events' logs [kafka]
dajac commented on code in PR #15432: URL: https://github.com/apache/kafka/pull/15432#discussion_r1507246125 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1731,15 +1735,19 @@ public void onNewMetadataImage( // Push an event for each coordinator. coordinators.keySet().forEach(tp -> { scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + newImage.offset() + ")", tp, () -> { -withContextOrThrow(tp, context -> { -if (context.state == CoordinatorState.ACTIVE) { +CoordinatorContext context = coordinators.get(tp); +if (context != null && context.state == CoordinatorState.ACTIVE) { Review Comment: Oh. Nice catch. This is a bug. Let me fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15462) Add group type filter to the admin client
[ https://issues.apache.org/jira/browse/KAFKA-15462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15462. - Fix Version/s: 3.8.0 Resolution: Fixed > Add group type filter to the admin client > - > > Key: KAFKA-15462 > URL: https://issues.apache.org/jira/browse/KAFKA-15462 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Ritika Reddy >Priority: Major > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]
dajac merged PR #15150: URL: https://github.com/apache/kafka/pull/15150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org