Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna merged PR #15426: URL: https://github.com/apache/kafka/pull/15426 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
AndrewJSchofield commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1510345836 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
kirktrue commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1974135452 > @kirktrue are you fine with merging this PR and coming back to this after 3.8? Yes. I think this is an area that we need a more holistic design review, unfortunately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1971644323 @kirktrue are you fine with merging this PR and coming back to this after 3.8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
kirktrue commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1968025786 @cadonna—thanks for the PR! I'm concerned that `MemberStateListener` mechanism has opened a path for us to sidestep the thread separation we've intentionally introduced. With this listener mechanism, the `MembershipManagerImpl` can invoke "arbitrary" code which might interact with the application thread in unexpected ways. With all of that said, I do recognize the issue we're facing with the existing 'group metadata update event' mechanism, so it's clear we do need to tweak the design a bit. I'm just not confident we've nailed it, yet. I will spend some time looking at this, and related, PRs and Jiras tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
kirktrue commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1968017311 > Did we take any decision about this architectural change proposed in [issues.apache.org/jira/browse/KAFKA-16290](https://issues.apache.org/jira/browse/KAFKA-16290)? Not that I'm aware of, no. I grabbed the ticket so that it would show up on my list of things to worry about > @kirktrue @philipnee @lianetm @dajac Does anybody of you know why we chose a shared object instead of sending events between the threads? IIRC, there were two reasons we felt comfortable using the `SubscriptionState` object directly: 1. It is heavily synchronized 2. We didn't want to add overhead for simple operations like looking up the subscribed topics, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504623635 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I do not see big value for the reader. As always, I see the risk that eventually the method is not only used for testing and then we have a lying comment in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
lucasbru commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504333691 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I'm not saying it's "so important", but it's nice for the reader, no? I won't insist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504158556 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I do not understand why it is so important to state that this method is only used in tests on the implementation class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504144313 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I can add it, but I really do not like those kind of inline comments. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I can add it, but I really do not like those kind of inline comments in the the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504156212 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -293,6 +276,23 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); +private final InvalidGroupIdException invalidGroupIdException = Review Comment: I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on PR #15426: URL: https://github.com/apache/kafka/pull/15426#issuecomment-1966444988 > Thanks for the PR! I left some comments > > Architecturally, this is going a bit against https://issues.apache.org/jira/browse/KAFKA-16290 which proposes propagating the subscription state via events to the application thread, which would resolve the ordering in another way. Do you have thoughts on that? Did we take any decision about this architectural change proposed in https://issues.apache.org/jira/browse/KAFKA-16290? In general I am in favor of the change, but I might miss the reason we opted to use a shared object. @kirktrue @philipnee @lianetm @dajac Does anybody of you know why we chose a shared object instead of sending events between the threads? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504146104 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1186,8 +1181,7 @@ public OptionalLong currentLag(TopicPartition topicPartition) { public ConsumerGroupMetadata groupMetadata() { acquireAndEnsureOpen(); try { -maybeThrowInvalidGroupIdException(); -return groupMetadata.get(); +return groupMetadata.get().orElseThrow(() -> invalidGroupIdException); Review Comment: Done ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -293,6 +276,23 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); +private final InvalidGroupIdException invalidGroupIdException = +new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + +"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + +private void updateGroupMetadata(final Optional memberEpoch, final Optional memberId) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504145709 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.test.TestUtils.requiredConsumerConfig; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class RequestManagersTest { + +@Test +public void testMemberStateListenerRegistered() { + +final MemberStateListener listener = (memberEpoch, memberId) -> { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504144313 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: I can add it, but I really do not like those kind of comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
cadonna commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504099715 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -293,6 +276,23 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); +private final InvalidGroupIdException invalidGroupIdException = +new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + +"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + +private void updateGroupMetadata(final Optional memberEpoch, final Optional memberId) { +groupMetadata.updateAndGet( Review Comment: The application thread can also update the group metadata when unsubscribing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]
lucasbru commented on code in PR #15426: URL: https://github.com/apache/kafka/pull/15426#discussion_r1504029285 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -293,6 +276,23 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); +private final InvalidGroupIdException invalidGroupIdException = +new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + +"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + +private void updateGroupMetadata(final Optional memberEpoch, final Optional memberId) { Review Comment: Please move this method. We shouldn't put private methods before the constructor. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -293,6 +276,23 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); +private final InvalidGroupIdException invalidGroupIdException = +new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + +"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + +private void updateGroupMetadata(final Optional memberEpoch, final Optional memberId) { +groupMetadata.updateAndGet( Review Comment: Can we not just use a `volatile` variable here? We only have the background thread updating the epoch and the ID, with the exception of the shutdown procedure in the application thread. A race between background thread and application thread however doesn't really seem to be solved by using `updateAndGet` (that would require checking if the consumer group metadata is "closed" here). ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1186,8 +1181,7 @@ public OptionalLong currentLag(TopicPartition topicPartition) { public ConsumerGroupMetadata groupMetadata() { acquireAndEnsureOpen(); try { -maybeThrowInvalidGroupIdException(); -return groupMetadata.get(); +return groupMetadata.get().orElseThrow(() -> invalidGroupIdException); Review Comment: I'd prefer calling `maybeThrowInvalidGroupIdException` here. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) { } return PollResult.EMPTY; } + +List stateListeners() { Review Comment: // for testing only ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -293,6 +276,23 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); +private final InvalidGroupIdException invalidGroupIdException = Review Comment: We will have one such instance for each consumer in memory at all times. Not a biggie, but I'd avoid it. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import