Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-14 Thread via GitHub


kirktrue closed pull request #14672: KAFKA-15281: Implement the groupMetadata 
Consumer API
URL: https://github.com/apache/kafka/pull/14672


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-14 Thread via GitHub


kirktrue commented on PR #14672:
URL: https://github.com/apache/kafka/pull/14672#issuecomment-1856533985

   This was implemented by @cadonna in #14879, so closing this pull request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-05 Thread via GitHub


cadonna merged PR #14879:
URL: https://github.com/apache/kafka/pull/14879


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-05 Thread via GitHub


kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1415960044


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -292,6 +297,90 @@ public void 
testValidateConsumerGroupHeartbeatRequest(final short version) {
 assertNull(heartbeatRequest.data().subscribedTopicRegex());
 }
 
+@Test
+public void testConsumerGroupMetadataFirstUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse response = createHeartbeatResponse(request, 
Errors.NONE);
+result.unsentRequests.get(0).handler().onComplete(response);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent event = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) event;
+final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+memberEpoch,
+memberId
+);
+assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+}
+
+@Test
+public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse firstResponse = createHeartbeatResponse(request, 
Errors.NONE);
+request.handler().onComplete(firstResponse);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent firstEvent = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
firstEvent.type());
+
+time.sleep(2000);

Review Comment:
   That's the intention of `resetWithZeroHeartbeatInterval()`, yes. At some 
point it did as it said 樂 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -292,6 +297,90 @@ public void 
testValidateConsumerGroupHeartbeatRequest(final short version) {
 assertNull(heartbeatRequest.data().subscribedTopicRegex());
 }
 
+@Test
+public void testConsumerGroupMetadataFirstUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse response = createHeartbeatResponse(request, 
Errors.NONE);
+result.unsentRequests.get(0).handler().onComplete(response);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent event = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) event;
+final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+memberEpoch,
+memberId
+);
+assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+}
+
+@Test
+public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse firstResponse = createHeartbeatResponse(request, 
Errors.NONE);
+request.handler().onComplete(firstResponse);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent firstEvent = 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-05 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1415421728


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,6 +235,16 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
 this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
 this.heartbeatRequestState.resetTimer();
 
this.membershipManager.onHeartbeatResponseReceived(response.data());
+if (previousGroupMetadataUdateEvent == null ||
+previousGroupMetadataUdateEvent.memberEpoch() != 
membershipManager.memberEpoch()) {
+
+final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent 
= new GroupMetadataUpdateEvent(
+membershipManager.memberEpoch(),
+previousGroupMetadataUdateEvent == null ? 
membershipManager.memberId() : previousGroupMetadataUdateEvent.memberId()
+);
+
this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
+previousGroupMetadataUdateEvent = 
currentGroupMetadataUpdateEvent;
+}

Review Comment:
   I think you are right! Let me change that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-05 Thread via GitHub


lucasbru commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1415340659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,6 +233,12 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
 this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
 this.heartbeatRequestState.resetTimer();
 
this.membershipManager.onHeartbeatResponseReceived(response.data());
+this.backgroundEventHandler.add(new GroupMetadataUpdateEvent(

Review Comment:
   creating a little follow-up ticket would have been fine as well, but looking 
good.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,6 +235,16 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
 this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
 this.heartbeatRequestState.resetTimer();
 
this.membershipManager.onHeartbeatResponseReceived(response.data());
+if (previousGroupMetadataUdateEvent == null ||
+previousGroupMetadataUdateEvent.memberEpoch() != 
membershipManager.memberEpoch()) {
+
+final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent 
= new GroupMetadataUpdateEvent(
+membershipManager.memberEpoch(),
+previousGroupMetadataUdateEvent == null ? 
membershipManager.memberId() : previousGroupMetadataUdateEvent.memberId()
+);
+
this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
+previousGroupMetadataUdateEvent = 
currentGroupMetadataUpdateEvent;
+}

Review Comment:
   Can't the `memberId` change when I rejoin the group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1414512071


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -292,6 +297,90 @@ public void 
testValidateConsumerGroupHeartbeatRequest(final short version) {
 assertNull(heartbeatRequest.data().subscribedTopicRegex());
 }
 
+@Test
+public void testConsumerGroupMetadataFirstUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse response = createHeartbeatResponse(request, 
Errors.NONE);
+result.unsentRequests.get(0).handler().onComplete(response);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent event = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) event;
+final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+memberEpoch,
+memberId
+);
+assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+}
+
+@Test
+public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+resetWithZeroHeartbeatInterval(Optional.empty());
+mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+assertEquals(1, result.unsentRequests.size());
+NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+ClientResponse firstResponse = createHeartbeatResponse(request, 
Errors.NONE);
+request.handler().onComplete(firstResponse);
+assertEquals(1, backgroundEventQueue.size());
+final BackgroundEvent firstEvent = backgroundEventQueue.poll();
+assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
firstEvent.type());
+
+time.sleep(2000);

Review Comment:
   @kirktrue Although I call  `resetWithZeroHeartbeatInterval()` in the 
beginning of this test method, I need to sleep at least 1000 ms to get a second 
heartbeat response. I thought `resetWithZeroHeartbeatInterval()` sets the 
heartbeat interval to zero, but during debugging I learnt that the heartbeat 
interval is still 1000 ms. Is this intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1414403764


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,6 +235,16 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
 this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
 this.heartbeatRequestState.resetTimer();
 
this.membershipManager.onHeartbeatResponseReceived(response.data());
+if (previousGroupMetadataUdateEvent == null ||
+previousGroupMetadataUdateEvent.memberEpoch() != 
membershipManager.memberEpoch()) {
+
+final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent 
= new GroupMetadataUpdateEvent(
+membershipManager.memberEpoch(),
+previousGroupMetadataUdateEvent == null ? 
membershipManager.memberId() : previousGroupMetadataUdateEvent.memberId()
+);
+
this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
+previousGroupMetadataUdateEvent = 
currentGroupMetadataUpdateEvent;
+}

Review Comment:
   @dajac @lucasbru @kirktrue @AndrewJSchofield Could you please sanity check 
this code snippet?
   The heartbeat request manager should only send an group metadata update 
event if it is the first heartbeat response it gets (i.e. 
`previousGroupMetadataUdateEvent == null`), because that sets the member ID and 
the member ID never changes after this. Or the member epoch changed in the last 
heartbeat response (i.e., previousGroupMetadataUdateEvent.memberEpoch() != 
membershipManager.memberEpoch()).
   
   Does this make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413915651


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,6 +233,12 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
 this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
 this.heartbeatRequestState.resetTimer();
 
this.membershipManager.onHeartbeatResponseReceived(response.data());
+this.backgroundEventHandler.add(new GroupMetadataUpdateEvent(

Review Comment:
   Yeah, I wanted to leave that as improvement, but I think it is simple enough 
to include it immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413910032


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 
 private static final long NO_CURRENT_THREAD = -1L;
 
+/**
+ * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+ * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+ * {@link ConsumerNetworkThread network thread}.
+ * Those events are generally of two types:
+ *
+ * 
+ * Errors that occur in the network thread that need to be 
propagated to the application thread
+ * {@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread
+ * 
+ */
+public class BackgroundEventProcessor extends 
EventProcessor {
+
+public BackgroundEventProcessor(final LogContext logContext,
+final BlockingQueue 
backgroundEventQueue) {
+super(logContext, backgroundEventQueue);
+}
+
+/**
+ * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+ * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+ * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+ * error, continue to process the remaining events, and then throw the 
first error that occurred.
+ */
+@Override
+public void process() {
+AtomicReference firstError = new 
AtomicReference<>();
+process((event, error) -> firstError.compareAndSet(null, error));
+
+if (firstError.get() != null) {
+throw firstError.get();
+}
+}
+
+@Override
+public void process(final BackgroundEvent event) {
+switch (event.type()) {
+case ERROR:
+process((ErrorBackgroundEvent) event);
+break;
+case GROUP_METADATA_UPDATE:
+process((GroupMetadataUpdateEvent) event);
+break;
+default:
+throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+}
+}
+
+@Override
+protected Class getEventClass() {
+return BackgroundEvent.class;
+}
+
+private void process(final ErrorBackgroundEvent event) {
+throw event.error();
+}
+
+private void process(final GroupMetadataUpdateEvent event) {
+if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+event.groupId(),

Review Comment:
   Yes, we do. We could verify that the group ID and the group instance ID are 
equal to catch bugs. Or we could just send the updateable fields to the 
application thread.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 
 private static final long NO_CURRENT_THREAD = -1L;
 
+/**
+ * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+ * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+ * {@link ConsumerNetworkThread network thread}.
+ * Those events are generally of two types:
+ *
+ * 
+ * Errors that occur in the network thread that need to be 
propagated to the application thread
+ * {@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread
+ * 
+ */
+public class BackgroundEventProcessor extends 
EventProcessor {
+
+public BackgroundEventProcessor(final LogContext logContext,
+final BlockingQueue 
backgroundEventQueue) {
+super(logContext, backgroundEventQueue);
+}
+
+/**
+ * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+ * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+ * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+ * error, continue to process the remaining 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


lucasbru commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413890315


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -232,6 +233,12 @@ private void onResponse(final 
ConsumerGroupHeartbeatResponse response, long curr
 this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
 this.heartbeatRequestState.resetTimer();
 
this.membershipManager.onHeartbeatResponseReceived(response.data());
+this.backgroundEventHandler.add(new GroupMetadataUpdateEvent(

Review Comment:
   Could we only send this if the member epoch or member ID changes? I think 
it's somewhat inelegant that we enqueue this for every single heartbeat. For 
example, we had the default max.poll.interval.ms = 300 seconds between all 
polls and the default heartbeat.interval= 3 seconds for each heartbeat, means 
we have 100 metadata update events to go through in each poll.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 
 private static final long NO_CURRENT_THREAD = -1L;
 
+/**
+ * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+ * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+ * {@link ConsumerNetworkThread network thread}.
+ * Those events are generally of two types:
+ *
+ * 
+ * Errors that occur in the network thread that need to be 
propagated to the application thread
+ * {@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread
+ * 
+ */
+public class BackgroundEventProcessor extends 
EventProcessor {
+
+public BackgroundEventProcessor(final LogContext logContext,
+final BlockingQueue 
backgroundEventQueue) {
+super(logContext, backgroundEventQueue);
+}
+
+/**
+ * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+ * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+ * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+ * error, continue to process the remaining events, and then throw the 
first error that occurred.
+ */
+@Override
+public void process() {
+AtomicReference firstError = new 
AtomicReference<>();
+process((event, error) -> firstError.compareAndSet(null, error));
+
+if (firstError.get() != null) {
+throw firstError.get();
+}
+}
+
+@Override
+public void process(final BackgroundEvent event) {
+switch (event.type()) {
+case ERROR:
+process((ErrorBackgroundEvent) event);
+break;
+case GROUP_METADATA_UPDATE:
+process((GroupMetadataUpdateEvent) event);
+break;
+default:
+throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+}
+}
+
+@Override
+protected Class getEventClass() {
+return BackgroundEvent.class;
+}
+
+private void process(final ErrorBackgroundEvent event) {
+throw event.error();
+}
+
+private void process(final GroupMetadataUpdateEvent event) {
+if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+event.groupId(),
+event.memberEpoch(),
+event.memberId() != null ? event.memberId() : 
currentGroupMetadata.memberId(),
+event.groupInstanceId()

Review Comment:
   Same as above, we read it from the config, does it ever change?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 
 private static final long NO_CURRENT_THREAD = -1L;
 
+/**
+ * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+ * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+ * {@link ConsumerNetworkThread network thread}.
+ * Those events are 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413776509


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
+ * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
+ * information for the current state of the group member is managed on the 
consumer network thread and thus
+ * requires this interplay between threads.
+ */
+public class GroupMetadataUpdateEvent extends BackgroundEvent {
+
+final private String groupId;
+final private int memberEpoch;
+final private String memberId;
+final private Optional groupInstanceId;
+
+public GroupMetadataUpdateEvent(final String groupId,
+final int memberEpoch,
+final String memberId,
+final Optional groupInstanceId) {
+super(Type.GROUP_METADATA_UPDATED);

Review Comment:
   Totally agree! I renamed `GroupMetadataUpdateEvent` from 
`GroupMetadataUpdatedEvent` and missed to rename `GROUP_METADATA_UPDATED`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


AndrewJSchofield commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413724390


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
+ * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
+ * information for the current state of the group member is managed on the 
consumer network thread and thus
+ * requires this interplay between threads.
+ */
+public class GroupMetadataUpdateEvent extends BackgroundEvent {
+
+final private String groupId;
+final private int memberEpoch;
+final private String memberId;
+final private Optional groupInstanceId;
+
+public GroupMetadataUpdateEvent(final String groupId,
+final int memberEpoch,
+final String memberId,
+final Optional groupInstanceId) {
+super(Type.GROUP_METADATA_UPDATED);

Review Comment:
   Sorry for being a naming fundamentalist, but you've used 
`GroupMetadataUpdateEvent` but `GROUP_METADATA_UPDATED`. Please use the same 
tense for the class and the constant. I'm sorting out the subclasses of 
`ApplicationEvent` in a similar vein in another PR. Just makes it slightly 
easier to navigate the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413694237


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##
@@ -49,4 +49,9 @@ public void add(BackgroundEvent event) {
 log.trace("Enqueued event: {}", event);
 backgroundEventQueue.add(event);
 }
+
+// Visible for testing
+public Queue backgroundEventQueue() {
+return backgroundEventQueue;
+}

Review Comment:
   OK, was able to get rid of that abominable piece of code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-04 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1413661084


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();

Review Comment:
   Yes, but I changed also the code to get rid of that line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683183


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   I think we should leave it only in `poll()` for the time being, yes  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   I think we should leave it only in `poll()` for the time being, yes  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683354


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();

Review Comment:
   Oh, the line after `throwInInvalidGroupIdException` is just to appease the 
compiler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412360413


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();
+return groupMetadata.orElseThrow(
+() -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   Actually, `backgroundEventProcessor.process();` sets the group metadata. So 
returning group metadata from `maybeThrowInvalidGroupIdException` might return 
an outdated group metadata. Since now I removed 
`backgroundEventProcessor.process();`. I will reconsider the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on PR #14879:
URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836371437

   > > Can you elaborate on the direction to remove the background queue from 
the 'test builder' instead of using the one it constructed?
   > 
   > I had issues with tests using the spy on the `AsyncKafkaConsumer`. More 
precisely, a test failed with the spy but did not fail with a consumer not 
wrapped into a spy. BTW, IMO, spies (or partial mocks) should be used really 
carefully. Actually, good code does not need spies (with a few exceptions). 
Spies avoid to structure the code well. They do not force one to loosely couple 
objects. Even the [Mockito 
documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy)
 warns against their own spies. Additionally, the code under test, i.e., the 
async Kafka consumer, should not be wrapped into a spy. We should test that 
code directly to avoid possible side effects coming from the wrapping or from 
mistakes in specifying stubs on the spy.
   
   I've had exactly the same problem. Nested mocking makes it all very unhappy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on PR #14879:
URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836362310

   > Can you elaborate on the direction to remove the background queue from the 
'test builder' instead of using the one it constructed?
   
   I had issues with tests using the spy on the `AsyncKafkaConsumer`. More 
precisely, a test failed with the spy but did not fail with a consumer not 
wrapped into a spy. 
   BTW, IMO, spies (or partial mocks) should be used really carefully. 
Actually, good code does not need spies (with a few exceptions). Spies avoid to 
structure the code well. They do not force one to loosely couple objects. Even 
the [Mockito 
documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy)
 warns against their own spies. Additionally, the code under test, i.e., the 
async Kafka consumer, should not be wrapped into a spy. We should test that 
code directly to avoid possible side effects coming from the wrapping or from 
mistakes in specifying stubs on the spy. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412179127


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -666,12 +766,16 @@ public Map 
committed(final Set

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412171456


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java:
##
@@ -51,91 +61,91 @@ public void tearDown() {
 testBuilder.close();
 }
 
-@Test
-public void testNoEvents() {
-assertTrue(backgroundEventQueue.isEmpty());
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleEvent() {
-BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event);
-assertPeeked(event);
-backgroundEventProcessor.process((e, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleErrorEvent() {
-KafkaException error = new KafkaException("error");
-BackgroundEvent event = new ErrorBackgroundEvent(error);
-backgroundEventHandler.add(new ErrorBackgroundEvent(error));
-assertPeeked(event);
-assertProcessThrows(error);
-}
-
-@Test
-public void testMultipleEvents() {
-BackgroundEvent event1 = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event1);
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-
-assertPeeked(event1);
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testMultipleErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
-assertProcessThrows(new KafkaException(error1));
-}
-
-@Test
-public void testMixedEventsWithErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-RuntimeException errorToCheck = new RuntimeException("A");
-backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("D")));
-
-assertProcessThrows(new KafkaException(errorToCheck));
-}
-
-private void assertPeeked(BackgroundEvent event) {
-BackgroundEvent peekEvent = backgroundEventQueue.peek();
-assertNotNull(peekEvent);
-assertEquals(event, peekEvent);
-}
-
-private void assertProcessThrows(Throwable error) {
-assertFalse(backgroundEventQueue.isEmpty());
-
-try {
-backgroundEventProcessor.process();
-fail("Should have thrown error: " + error);
-} catch (Throwable t) {
-assertEquals(error.getClass(), t.getClass());
-assertEquals(error.getMessage(), t.getMessage());
-}
-
-assertTrue(backgroundEventQueue.isEmpty());
-}
+//@Test

Review Comment:
   You need to have those commented out code in a draft PR, otherwise GitHub 
let's you not mark it as draft  
   
   Jokes aside, yes they will be removed.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java:
##
@@ -51,91 +61,91 @@ public void tearDown() {
 testBuilder.close();
 }
 
-@Test
-public void testNoEvents() {
-assertTrue(backgroundEventQueue.isEmpty());
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleEvent() {
-BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event);
-assertPeeked(event);
-backgroundEventProcessor.process((e, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleErrorEvent() {
-KafkaException error = new KafkaException("error");
-BackgroundEvent event = new 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412169763


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Touché  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Touché  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412168452


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -666,12 +766,16 @@ public Map 
committed(final Sethttps://github.com/apache/kafka/pull/14872



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412158807


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Please use the `ConsumerConfigs` constant for the `"group.instance.id"`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();
+return groupMetadata.orElseThrow(
+() -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   You *know* that groupMetadata is present because of the earlier 
`maybeThrowInvalidGroupIdException`. I suppose one pattern would be to return 
an unwrapped `GroupMetadata` from maybeThrowInvalidGroupIdException so that 
you've eliminated the possibility of the `Optional` being empty.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -666,12 +766,16 @@ public Map 
committed(final Set { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleEvent() {
-BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event);
-assertPeeked(event);
-backgroundEventProcessor.process((e, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testSingleErrorEvent() {
-KafkaException error = new KafkaException("error");
-BackgroundEvent event = new ErrorBackgroundEvent(error);
-backgroundEventHandler.add(new ErrorBackgroundEvent(error));
-assertPeeked(event);
-assertProcessThrows(error);
-}
-
-@Test
-public void testMultipleEvents() {
-BackgroundEvent event1 = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-backgroundEventQueue.add(event1);
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-
-assertPeeked(event1);
-backgroundEventProcessor.process((event, error) -> { });
-assertTrue(backgroundEventQueue.isEmpty());
-}
-
-@Test
-public void testMultipleErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
-assertProcessThrows(new KafkaException(error1));
-}
-
-@Test
-public void testMixedEventsWithErrorEvents() {
-Throwable error1 = new Throwable("error1");
-KafkaException error2 = new KafkaException("error2");
-KafkaException error3 = new KafkaException("error3");
-
-RuntimeException errorToCheck = new RuntimeException("A");
-backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("D")));
-
-assertProcessThrows(new KafkaException(errorToCheck));
-}
-
-private void assertPeeked(BackgroundEvent event) {
-BackgroundEvent peekEvent = backgroundEventQueue.peek();
-assertNotNull(peekEvent);
-assertEquals(event, peekEvent);
-}
-
-private void assertProcessThrows(Throwable error) {
-assertFalse(backgroundEventQueue.isEmpty());
-
-try {
-backgroundEventProcessor.process();
-fail("Should have 

Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412156215


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##
@@ -49,4 +49,9 @@ public void add(BackgroundEvent event) {
 log.trace("Enqueued event: {}", event);
 backgroundEventQueue.add(event);
 }
+
+// Visible for testing
+public Queue backgroundEventQueue() {
+return backgroundEventQueue;
+}

Review Comment:
   I had to take painkillers to stand the pain when I added that method because 
I totally agree with you but I did not find another way. I will reconsider. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412153908


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   OK, should then `backgroundEventProcessor.process()` only be called in 
`AsyncKafkaConsumer#poll()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412142819


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {

Review Comment:
   That was a temporary change that I missed to remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-01 Thread via GitHub


cadonna commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412141829


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();

Review Comment:
   Because the group ID cannot be empty if it is set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-11-30 Thread via GitHub


kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1411200070


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();

Review Comment:
   Why are we throwing this error here?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -131,9 +135,78 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 
 private static final long NO_CURRENT_THREAD = -1L;
 
+/**
+ * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the application thread for the purpose of processing

Review Comment:
   Super nit: reflow the comments around whatever the line length standard is.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();

Review Comment:
   I don't think we want to process _all_ the events here, as this might 
process events (like async commits, et al.) that are unrelated to group 
metadata.
   
   I have this same issue in another PR I'm working on, so I don't have a 
solution yet  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {

Review Comment:
   This is just for testing, right? Can we remove the `public` qualifier?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
 }
 }
 
+public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+this.groupMetadata = Optional.of(groupMetadata);
+}
+
 @Override
 public ConsumerGroupMetadata groupMetadata() {
-throw new KafkaException("method not implemented");
+acquireAndEnsureOpen();
+try {
+maybeThrowInvalidGroupIdException();
+backgroundEventProcessor.process();
+return groupMetadata.orElseThrow(
+() -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   I'm not sure what we should throw here. Does the API contract allow for 
`IllegalStateException`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdatedEvent.java:
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language 

[PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-11-30 Thread via GitHub


cadonna opened a new pull request, #14879:
URL: https://github.com/apache/kafka/pull/14879

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-11-11 Thread via GitHub


dajac commented on code in PR #14672:
URL: https://github.com/apache/kafka/pull/14672#discussion_r1390251511


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -153,6 +154,13 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+@Override
+public ConsumerGroupMetadata groupMetadata() {
+// TODO: what do we use here, epoch?

Review Comment:
   We should return the member epoch. The generationId name is a bit annoying 
though. I am not sure if we can do something about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-10-30 Thread via GitHub


kirktrue commented on PR #14672:
URL: https://github.com/apache/kafka/pull/14672#issuecomment-1786396441

   @philipnee Can you add the `ctr` tag, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-10-30 Thread via GitHub


kirktrue opened a new pull request, #14672:
URL: https://github.com/apache/kafka/pull/14672

   This implements the `Consumer.groupMetadata()` API by means of an event 
passed to and fulfilled in the consumer network I/O thread. The application 
thread will block  until this event is processed in the background thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org