Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-03-04 Thread via GitHub


cadonna merged PR #15426:
URL: https://github.com/apache/kafka/pull/15426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-03-03 Thread via GitHub


AndrewJSchofield commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1510345836


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-03-01 Thread via GitHub


kirktrue commented on PR #15426:
URL: https://github.com/apache/kafka/pull/15426#issuecomment-1974135452

   > @kirktrue are you fine with merging this PR and coming back to this after 
3.8?
   
   Yes. I think this is an area that we need a more holistic design review, 
unfortunately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-29 Thread via GitHub


cadonna commented on PR #15426:
URL: https://github.com/apache/kafka/pull/15426#issuecomment-1971644323

   @kirktrue are you fine with merging this PR and coming back to this after 
3.8?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


kirktrue commented on PR #15426:
URL: https://github.com/apache/kafka/pull/15426#issuecomment-1968025786

   @cadonna—thanks for the PR!
   
   I'm concerned that `MemberStateListener` mechanism has opened a path for us 
to sidestep the thread separation we've intentionally introduced. With this 
listener mechanism, the `MembershipManagerImpl` can invoke "arbitrary" code 
which might interact with the application thread in unexpected ways.
   
   With all of that said, I do recognize the issue we're facing with the 
existing 'group metadata update event' mechanism, so it's clear we do need to 
tweak the design a bit. I'm just not confident we've nailed it, yet.
   
   I will spend some time looking at this, and related, PRs and Jiras tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


kirktrue commented on PR #15426:
URL: https://github.com/apache/kafka/pull/15426#issuecomment-1968017311

   > Did we take any decision about this architectural change proposed in 
[issues.apache.org/jira/browse/KAFKA-16290](https://issues.apache.org/jira/browse/KAFKA-16290)?
   
   Not that I'm aware of, no. I grabbed the ticket so that it would show up on 
my list of things to worry about 
   
   > @kirktrue @philipnee @lianetm @dajac Does anybody of you know why we chose 
a shared object instead of sending events between the threads?
   
   IIRC, there were two reasons we felt comfortable using the 
`SubscriptionState` object directly:
   
   1. It is heavily synchronized
   2. We didn't want to add overhead for simple operations like looking up the 
subscribed topics, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504623635


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I do not see big value for the reader. As always, I see the risk that 
eventually the method is not only used for testing and then we have a lying 
comment in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


lucasbru commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504333691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I'm not saying it's "so important", but it's nice for the reader, no? I 
won't insist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504158556


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I do not understand why it is so important to state that this method is only 
used in tests on the implementation class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504144313


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I can add it, but I really do not like those kind of inline comments.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I can add it, but I really do not like those kind of inline comments in the 
the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504156212


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -293,6 +276,23 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 private final AtomicInteger refCount = new AtomicInteger(0);
 
+private final InvalidGroupIdException invalidGroupIdException =

Review Comment:
   I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on PR #15426:
URL: https://github.com/apache/kafka/pull/15426#issuecomment-1966444988

   > Thanks for the PR! I left some comments
   > 
   > Architecturally, this is going a bit against 
https://issues.apache.org/jira/browse/KAFKA-16290 which proposes propagating 
the subscription state via events to the application thread, which would 
resolve the ordering in another way. Do you have thoughts on that?
   
   Did we take any decision about this architectural change proposed in 
https://issues.apache.org/jira/browse/KAFKA-16290?
   
   In general I am in favor of the change, but I might miss the reason we opted 
to use a shared object.

   @kirktrue @philipnee @lianetm @dajac Does anybody of you know why we chose a 
shared object instead of sending events between the threads? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504146104


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1186,8 +1181,7 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 public ConsumerGroupMetadata groupMetadata() {
 acquireAndEnsureOpen();
 try {
-maybeThrowInvalidGroupIdException();
-return groupMetadata.get();
+return groupMetadata.get().orElseThrow(() -> 
invalidGroupIdException);

Review Comment:
   Done



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -293,6 +276,23 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 private final AtomicInteger refCount = new AtomicInteger(0);
 
+private final InvalidGroupIdException invalidGroupIdException =
+new InvalidGroupIdException("To use the group management or offset 
commit APIs, you must " +
+"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the 
consumer configuration.");
+
+private void updateGroupMetadata(final Optional memberEpoch, 
final Optional memberId) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504145709


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class RequestManagersTest {
+
+@Test
+public void testMemberStateListenerRegistered() {
+
+final MemberStateListener listener = (memberEpoch, memberId) -> {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504144313


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   I can add it, but I really do not like those kind of comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


cadonna commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504099715


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -293,6 +276,23 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 private final AtomicInteger refCount = new AtomicInteger(0);
 
+private final InvalidGroupIdException invalidGroupIdException =
+new InvalidGroupIdException("To use the group management or offset 
commit APIs, you must " +
+"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the 
consumer configuration.");
+
+private void updateGroupMetadata(final Optional memberEpoch, 
final Optional memberId) {
+groupMetadata.updateAndGet(

Review Comment:
   The application thread can also update the group metadata when unsubscribing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16285: Make group metadata available when a new assignment is set [kafka]

2024-02-27 Thread via GitHub


lucasbru commented on code in PR #15426:
URL: https://github.com/apache/kafka/pull/15426#discussion_r1504029285


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -293,6 +276,23 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 private final AtomicInteger refCount = new AtomicInteger(0);
 
+private final InvalidGroupIdException invalidGroupIdException =
+new InvalidGroupIdException("To use the group management or offset 
commit APIs, you must " +
+"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the 
consumer configuration.");
+
+private void updateGroupMetadata(final Optional memberEpoch, 
final Optional memberId) {

Review Comment:
   Please move this method. We shouldn't put private methods before the 
constructor.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -293,6 +276,23 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 private final AtomicInteger refCount = new AtomicInteger(0);
 
+private final InvalidGroupIdException invalidGroupIdException =
+new InvalidGroupIdException("To use the group management or offset 
commit APIs, you must " +
+"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the 
consumer configuration.");
+
+private void updateGroupMetadata(final Optional memberEpoch, 
final Optional memberId) {
+groupMetadata.updateAndGet(

Review Comment:
   Can we not just use a `volatile` variable here? We only have the background 
thread updating the epoch and the ID, with the exception of the shutdown 
procedure in the application thread. A race between background thread and 
application thread however doesn't really seem to be solved by using 
`updateAndGet` (that would require checking if the consumer group metadata is 
"closed" here).



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1186,8 +1181,7 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 public ConsumerGroupMetadata groupMetadata() {
 acquireAndEnsureOpen();
 try {
-maybeThrowInvalidGroupIdException();
-return groupMetadata.get();
+return groupMetadata.get().orElseThrow(() -> 
invalidGroupIdException);

Review Comment:
   I'd prefer calling `maybeThrowInvalidGroupIdException` here.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1372,4 +1373,8 @@ public PollResult poll(final long currentTimeMs) {
 }
 return PollResult.EMPTY;
 }
+
+List stateListeners() {

Review Comment:
   // for testing only



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -293,6 +276,23 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
 private final AtomicInteger refCount = new AtomicInteger(0);
 
+private final InvalidGroupIdException invalidGroupIdException =

Review Comment:
   We will have one such instance for each consumer in memory at all times. Not 
a biggie, but I'd avoid it.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import