Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue closed pull request #14672: KAFKA-15281: Implement the groupMetadata Consumer API URL: https://github.com/apache/kafka/pull/14672 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on PR #14672: URL: https://github.com/apache/kafka/pull/14672#issuecomment-1856533985 This was implemented by @cadonna in #14879, so closing this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna merged PR #14879: URL: https://github.com/apache/kafka/pull/14879 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1415960044 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -292,6 +297,90 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertNull(heartbeatRequest.data().subscribedTopicRegex()); } +@Test +public void testConsumerGroupMetadataFirstUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); + +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse response = createHeartbeatResponse(request, Errors.NONE); +result.unsentRequests.get(0).handler().onComplete(response); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent event = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type()); +final GroupMetadataUpdateEvent groupMetadataUpdateEvent = (GroupMetadataUpdateEvent) event; +final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +memberEpoch, +memberId +); +assertEquals(expectedGroupMetadataUpdateEvent, groupMetadataUpdateEvent); +} + +@Test +public void testConsumerGroupMetadataUpdateWithSameUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse firstResponse = createHeartbeatResponse(request, Errors.NONE); +request.handler().onComplete(firstResponse); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent firstEvent = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, firstEvent.type()); + +time.sleep(2000); Review Comment: That's the intention of `resetWithZeroHeartbeatInterval()`, yes. At some point it did as it said 樂 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -292,6 +297,90 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertNull(heartbeatRequest.data().subscribedTopicRegex()); } +@Test +public void testConsumerGroupMetadataFirstUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); + +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse response = createHeartbeatResponse(request, Errors.NONE); +result.unsentRequests.get(0).handler().onComplete(response); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent event = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type()); +final GroupMetadataUpdateEvent groupMetadataUpdateEvent = (GroupMetadataUpdateEvent) event; +final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +memberEpoch, +memberId +); +assertEquals(expectedGroupMetadataUpdateEvent, groupMetadataUpdateEvent); +} + +@Test +public void testConsumerGroupMetadataUpdateWithSameUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse firstResponse = createHeartbeatResponse(request, Errors.NONE); +request.handler().onComplete(firstResponse); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent firstEvent =
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1415421728 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -232,6 +235,16 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); +if (previousGroupMetadataUdateEvent == null || +previousGroupMetadataUdateEvent.memberEpoch() != membershipManager.memberEpoch()) { + +final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +membershipManager.memberEpoch(), +previousGroupMetadataUdateEvent == null ? membershipManager.memberId() : previousGroupMetadataUdateEvent.memberId() +); + this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent); +previousGroupMetadataUdateEvent = currentGroupMetadataUpdateEvent; +} Review Comment: I think you are right! Let me change that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
lucasbru commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1415340659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -232,6 +233,12 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); +this.backgroundEventHandler.add(new GroupMetadataUpdateEvent( Review Comment: creating a little follow-up ticket would have been fine as well, but looking good. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -232,6 +235,16 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); +if (previousGroupMetadataUdateEvent == null || +previousGroupMetadataUdateEvent.memberEpoch() != membershipManager.memberEpoch()) { + +final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +membershipManager.memberEpoch(), +previousGroupMetadataUdateEvent == null ? membershipManager.memberId() : previousGroupMetadataUdateEvent.memberId() +); + this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent); +previousGroupMetadataUdateEvent = currentGroupMetadataUpdateEvent; +} Review Comment: Can't the `memberId` change when I rejoin the group? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1414512071 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -292,6 +297,90 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { assertNull(heartbeatRequest.data().subscribedTopicRegex()); } +@Test +public void testConsumerGroupMetadataFirstUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); + +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse response = createHeartbeatResponse(request, Errors.NONE); +result.unsentRequests.get(0).handler().onComplete(response); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent event = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type()); +final GroupMetadataUpdateEvent groupMetadataUpdateEvent = (GroupMetadataUpdateEvent) event; +final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +memberEpoch, +memberId +); +assertEquals(expectedGroupMetadataUpdateEvent, groupMetadataUpdateEvent); +} + +@Test +public void testConsumerGroupMetadataUpdateWithSameUpdate() { +resetWithZeroHeartbeatInterval(Optional.empty()); +mockStableMember(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest request = result.unsentRequests.get(0); +ClientResponse firstResponse = createHeartbeatResponse(request, Errors.NONE); +request.handler().onComplete(firstResponse); +assertEquals(1, backgroundEventQueue.size()); +final BackgroundEvent firstEvent = backgroundEventQueue.poll(); +assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, firstEvent.type()); + +time.sleep(2000); Review Comment: @kirktrue Although I call `resetWithZeroHeartbeatInterval()` in the beginning of this test method, I need to sleep at least 1000 ms to get a second heartbeat response. I thought `resetWithZeroHeartbeatInterval()` sets the heartbeat interval to zero, but during debugging I learnt that the heartbeat interval is still 1000 ms. Is this intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1414403764 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -232,6 +235,16 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); +if (previousGroupMetadataUdateEvent == null || +previousGroupMetadataUdateEvent.memberEpoch() != membershipManager.memberEpoch()) { + +final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent( +membershipManager.memberEpoch(), +previousGroupMetadataUdateEvent == null ? membershipManager.memberId() : previousGroupMetadataUdateEvent.memberId() +); + this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent); +previousGroupMetadataUdateEvent = currentGroupMetadataUpdateEvent; +} Review Comment: @dajac @lucasbru @kirktrue @AndrewJSchofield Could you please sanity check this code snippet? The heartbeat request manager should only send an group metadata update event if it is the first heartbeat response it gets (i.e. `previousGroupMetadataUdateEvent == null`), because that sets the member ID and the member ID never changes after this. Or the member epoch changed in the last heartbeat response (i.e., previousGroupMetadataUdateEvent.memberEpoch() != membershipManager.memberEpoch()). Does this make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413915651 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -232,6 +233,12 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); +this.backgroundEventHandler.add(new GroupMetadataUpdateEvent( Review Comment: Yeah, I wanted to leave that as improvement, but I think it is simple enough to include it immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413910032 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; +/** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are generally of two types: + * + * + * Errors that occur in the network thread that need to be propagated to the application thread + * {@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread + * + */ +public class BackgroundEventProcessor extends EventProcessor { + +public BackgroundEventProcessor(final LogContext logContext, +final BlockingQueue backgroundEventQueue) { +super(logContext, backgroundEventQueue); +} + +/** + * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. + * It is possible that {@link org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an error} + * could occur when processing the events. In such cases, the processor will take a reference to the first + * error, continue to process the remaining events, and then throw the first error that occurred. + */ +@Override +public void process() { +AtomicReference firstError = new AtomicReference<>(); +process((event, error) -> firstError.compareAndSet(null, error)); + +if (firstError.get() != null) { +throw firstError.get(); +} +} + +@Override +public void process(final BackgroundEvent event) { +switch (event.type()) { +case ERROR: +process((ErrorBackgroundEvent) event); +break; +case GROUP_METADATA_UPDATE: +process((GroupMetadataUpdateEvent) event); +break; +default: +throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); + +} +} + +@Override +protected Class getEventClass() { +return BackgroundEvent.class; +} + +private void process(final ErrorBackgroundEvent event) { +throw event.error(); +} + +private void process(final GroupMetadataUpdateEvent event) { +if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) { +final ConsumerGroupMetadata currentGroupMetadata = AsyncKafkaConsumer.this.groupMetadata.get(); +AsyncKafkaConsumer.this.groupMetadata = Optional.of(new ConsumerGroupMetadata( +event.groupId(), Review Comment: Yes, we do. We could verify that the group ID and the group instance ID are equal to catch bugs. Or we could just send the updateable fields to the application thread. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; +/** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are generally of two types: + * + * + * Errors that occur in the network thread that need to be propagated to the application thread + * {@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread + * + */ +public class BackgroundEventProcessor extends EventProcessor { + +public BackgroundEventProcessor(final LogContext logContext, +final BlockingQueue backgroundEventQueue) { +super(logContext, backgroundEventQueue); +} + +/** + * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. + * It is possible that {@link org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an error} + * could occur when processing the events. In such cases, the processor will take a reference to the first + * error, continue to process the remaining
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
lucasbru commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413890315 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -232,6 +233,12 @@ private void onResponse(final ConsumerGroupHeartbeatResponse response, long curr this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); this.heartbeatRequestState.resetTimer(); this.membershipManager.onHeartbeatResponseReceived(response.data()); +this.backgroundEventHandler.add(new GroupMetadataUpdateEvent( Review Comment: Could we only send this if the member epoch or member ID changes? I think it's somewhat inelegant that we enqueue this for every single heartbeat. For example, we had the default max.poll.interval.ms = 300 seconds between all polls and the default heartbeat.interval= 3 seconds for each heartbeat, means we have 100 metadata update events to go through in each poll. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; +/** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are generally of two types: + * + * + * Errors that occur in the network thread that need to be propagated to the application thread + * {@link ConsumerRebalanceListener} callbacks that are to be executed on the application thread + * + */ +public class BackgroundEventProcessor extends EventProcessor { + +public BackgroundEventProcessor(final LogContext logContext, +final BlockingQueue backgroundEventQueue) { +super(logContext, backgroundEventQueue); +} + +/** + * Process the events—if any—that were produced by the {@link ConsumerNetworkThread network thread}. + * It is possible that {@link org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an error} + * could occur when processing the events. In such cases, the processor will take a reference to the first + * error, continue to process the remaining events, and then throw the first error that occurred. + */ +@Override +public void process() { +AtomicReference firstError = new AtomicReference<>(); +process((event, error) -> firstError.compareAndSet(null, error)); + +if (firstError.get() != null) { +throw firstError.get(); +} +} + +@Override +public void process(final BackgroundEvent event) { +switch (event.type()) { +case ERROR: +process((ErrorBackgroundEvent) event); +break; +case GROUP_METADATA_UPDATE: +process((GroupMetadataUpdateEvent) event); +break; +default: +throw new IllegalArgumentException("Background event type " + event.type() + " was not expected"); + +} +} + +@Override +protected Class getEventClass() { +return BackgroundEvent.class; +} + +private void process(final ErrorBackgroundEvent event) { +throw event.error(); +} + +private void process(final GroupMetadataUpdateEvent event) { +if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) { +final ConsumerGroupMetadata currentGroupMetadata = AsyncKafkaConsumer.this.groupMetadata.get(); +AsyncKafkaConsumer.this.groupMetadata = Optional.of(new ConsumerGroupMetadata( +event.groupId(), +event.memberEpoch(), +event.memberId() != null ? event.memberId() : currentGroupMetadata.memberId(), +event.groupInstanceId() Review Comment: Same as above, we read it from the config, does it ever change? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -131,6 +133,77 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; +/** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the + * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the + * {@link ConsumerNetworkThread network thread}. + * Those events are
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413776509 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; + +import java.util.Objects; +import java.util.Optional; + +/** + * This event is sent by the {@link ConsumerNetworkThread consumer's network thread} to the application thread + * so that when the user calls the {@link Consumer#groupMetadata()} API, the information is up-to-date. The + * information for the current state of the group member is managed on the consumer network thread and thus + * requires this interplay between threads. + */ +public class GroupMetadataUpdateEvent extends BackgroundEvent { + +final private String groupId; +final private int memberEpoch; +final private String memberId; +final private Optional groupInstanceId; + +public GroupMetadataUpdateEvent(final String groupId, +final int memberEpoch, +final String memberId, +final Optional groupInstanceId) { +super(Type.GROUP_METADATA_UPDATED); Review Comment: Totally agree! I renamed `GroupMetadataUpdateEvent` from `GroupMetadataUpdatedEvent` and missed to rename `GROUP_METADATA_UPDATED`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413724390 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; + +import java.util.Objects; +import java.util.Optional; + +/** + * This event is sent by the {@link ConsumerNetworkThread consumer's network thread} to the application thread + * so that when the user calls the {@link Consumer#groupMetadata()} API, the information is up-to-date. The + * information for the current state of the group member is managed on the consumer network thread and thus + * requires this interplay between threads. + */ +public class GroupMetadataUpdateEvent extends BackgroundEvent { + +final private String groupId; +final private int memberEpoch; +final private String memberId; +final private Optional groupInstanceId; + +public GroupMetadataUpdateEvent(final String groupId, +final int memberEpoch, +final String memberId, +final Optional groupInstanceId) { +super(Type.GROUP_METADATA_UPDATED); Review Comment: Sorry for being a naming fundamentalist, but you've used `GroupMetadataUpdateEvent` but `GROUP_METADATA_UPDATED`. Please use the same tense for the class and the constant. I'm sorting out the subclasses of `ApplicationEvent` in a similar vein in another PR. Just makes it slightly easier to navigate the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413694237 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ## @@ -49,4 +49,9 @@ public void add(BackgroundEvent event) { log.trace("Enqueued event: {}", event); backgroundEventQueue.add(event); } + +// Visible for testing +public Queue backgroundEventQueue() { +return backgroundEventQueue; +} Review Comment: OK, was able to get rid of that abominable piece of code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1413661084 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); Review Comment: Yes, but I changed also the code to get rid of that line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683183 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: I think we should leave it only in `poll()` for the time being, yes ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: I think we should leave it only in `poll()` for the time being, yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412683354 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); Review Comment: Oh, the line after `throwInInvalidGroupIdException` is just to appease the compiler? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412360413 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); +return groupMetadata.orElseThrow( +() -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: Actually, `backgroundEventProcessor.process();` sets the group metadata. So returning group metadata from `maybeThrowInvalidGroupIdException` might return an outdated group metadata. Since now I removed `backgroundEventProcessor.process();`. I will reconsider the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on PR #14879: URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836371437 > > Can you elaborate on the direction to remove the background queue from the 'test builder' instead of using the one it constructed? > > I had issues with tests using the spy on the `AsyncKafkaConsumer`. More precisely, a test failed with the spy but did not fail with a consumer not wrapped into a spy. BTW, IMO, spies (or partial mocks) should be used really carefully. Actually, good code does not need spies (with a few exceptions). Spies avoid to structure the code well. They do not force one to loosely couple objects. Even the [Mockito documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy) warns against their own spies. Additionally, the code under test, i.e., the async Kafka consumer, should not be wrapped into a spy. We should test that code directly to avoid possible side effects coming from the wrapping or from mistakes in specifying stubs on the spy. I've had exactly the same problem. Nested mocking makes it all very unhappy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on PR #14879: URL: https://github.com/apache/kafka/pull/14879#issuecomment-1836362310 > Can you elaborate on the direction to remove the background queue from the 'test builder' instead of using the one it constructed? I had issues with tests using the spy on the `AsyncKafkaConsumer`. More precisely, a test failed with the spy but did not fail with a consumer not wrapped into a spy. BTW, IMO, spies (or partial mocks) should be used really carefully. Actually, good code does not need spies (with a few exceptions). Spies avoid to structure the code well. They do not force one to loosely couple objects. Even the [Mockito documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#spy) warns against their own spies. Additionally, the code under test, i.e., the async Kafka consumer, should not be wrapped into a spy. We should test that code directly to avoid possible side effects coming from the wrapping or from mistakes in specifying stubs on the spy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412179127 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -666,12 +766,16 @@ public Map committed(final Set
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412171456 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java: ## @@ -51,91 +61,91 @@ public void tearDown() { testBuilder.close(); } -@Test -public void testNoEvents() { -assertTrue(backgroundEventQueue.isEmpty()); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleEvent() { -BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event); -assertPeeked(event); -backgroundEventProcessor.process((e, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleErrorEvent() { -KafkaException error = new KafkaException("error"); -BackgroundEvent event = new ErrorBackgroundEvent(error); -backgroundEventHandler.add(new ErrorBackgroundEvent(error)); -assertPeeked(event); -assertProcessThrows(error); -} - -@Test -public void testMultipleEvents() { -BackgroundEvent event1 = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event1); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); - -assertPeeked(event1); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testMultipleErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); - -assertProcessThrows(new KafkaException(error1)); -} - -@Test -public void testMixedEventsWithErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -RuntimeException errorToCheck = new RuntimeException("A"); -backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("D"))); - -assertProcessThrows(new KafkaException(errorToCheck)); -} - -private void assertPeeked(BackgroundEvent event) { -BackgroundEvent peekEvent = backgroundEventQueue.peek(); -assertNotNull(peekEvent); -assertEquals(event, peekEvent); -} - -private void assertProcessThrows(Throwable error) { -assertFalse(backgroundEventQueue.isEmpty()); - -try { -backgroundEventProcessor.process(); -fail("Should have thrown error: " + error); -} catch (Throwable t) { -assertEquals(error.getClass(), t.getClass()); -assertEquals(error.getMessage(), t.getMessage()); -} - -assertTrue(backgroundEventQueue.isEmpty()); -} +//@Test Review Comment: You need to have those commented out code in a draft PR, otherwise GitHub let's you not mark it as draft Jokes aside, yes they will be removed. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java: ## @@ -51,91 +61,91 @@ public void tearDown() { testBuilder.close(); } -@Test -public void testNoEvents() { -assertTrue(backgroundEventQueue.isEmpty()); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleEvent() { -BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event); -assertPeeked(event); -backgroundEventProcessor.process((e, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleErrorEvent() { -KafkaException error = new KafkaException("error"); -BackgroundEvent event = new
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412169763 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Touché ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Touché -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412168452 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -666,12 +766,16 @@ public Map committed(final Sethttps://github.com/apache/kafka/pull/14872 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
AndrewJSchofield commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412158807 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Please use the `ConsumerConfigs` constant for the `"group.instance.id"`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); +return groupMetadata.orElseThrow( +() -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: You *know* that groupMetadata is present because of the earlier `maybeThrowInvalidGroupIdException`. I suppose one pattern would be to return an unwrapped `GroupMetadata` from maybeThrowInvalidGroupIdException so that you've eliminated the possibility of the `Optional` being empty. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -666,12 +766,16 @@ public Map committed(final Set { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleEvent() { -BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event); -assertPeeked(event); -backgroundEventProcessor.process((e, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testSingleErrorEvent() { -KafkaException error = new KafkaException("error"); -BackgroundEvent event = new ErrorBackgroundEvent(error); -backgroundEventHandler.add(new ErrorBackgroundEvent(error)); -assertPeeked(event); -assertProcessThrows(error); -} - -@Test -public void testMultipleEvents() { -BackgroundEvent event1 = new ErrorBackgroundEvent(new RuntimeException("A")); -backgroundEventQueue.add(event1); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); - -assertPeeked(event1); -backgroundEventProcessor.process((event, error) -> { }); -assertTrue(backgroundEventQueue.isEmpty()); -} - -@Test -public void testMultipleErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); - -assertProcessThrows(new KafkaException(error1)); -} - -@Test -public void testMixedEventsWithErrorEvents() { -Throwable error1 = new Throwable("error1"); -KafkaException error2 = new KafkaException("error2"); -KafkaException error3 = new KafkaException("error3"); - -RuntimeException errorToCheck = new RuntimeException("A"); -backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck)); -backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); -backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); -backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("D"))); - -assertProcessThrows(new KafkaException(errorToCheck)); -} - -private void assertPeeked(BackgroundEvent event) { -BackgroundEvent peekEvent = backgroundEventQueue.peek(); -assertNotNull(peekEvent); -assertEquals(event, peekEvent); -} - -private void assertProcessThrows(Throwable error) { -assertFalse(backgroundEventQueue.isEmpty()); - -try { -backgroundEventProcessor.process(); -fail("Should have
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412156215 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ## @@ -49,4 +49,9 @@ public void add(BackgroundEvent event) { log.trace("Enqueued event: {}", event); backgroundEventQueue.add(event); } + +// Visible for testing +public Queue backgroundEventQueue() { +return backgroundEventQueue; +} Review Comment: I had to take painkillers to stand the pain when I added that method because I totally agree with you but I did not find another way. I will reconsider. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412153908 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: OK, should then `backgroundEventProcessor.process()` only be called in `AsyncKafkaConsumer#poll()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412142819 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { Review Comment: That was a temporary change that I missed to remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412141829 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); Review Comment: Because the group ID cannot be empty if it is set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1411200070 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); Review Comment: Why are we throwing this error here? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -131,9 +135,78 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; +/** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the application thread for the purpose of processing Review Comment: Super nit: reflow the comments around whatever the line length standard is. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); Review Comment: I don't think we want to process _all_ the events here, as this might process events (like async commits, et al.) that are unrelated to group metadata. I have this same issue in another PR I'm working on, so I don't have a solution yet ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { Review Comment: This is just for testing, right? Can we remove the `public` qualifier? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } +public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { +this.groupMetadata = Optional.of(groupMetadata); +} + @Override public ConsumerGroupMetadata groupMetadata() { -throw new KafkaException("method not implemented"); +acquireAndEnsureOpen(); +try { +maybeThrowInvalidGroupIdException(); +backgroundEventProcessor.process(); +return groupMetadata.orElseThrow( +() -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: I'm not sure what we should throw here. Does the API contract allow for `IllegalStateException`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdatedEvent.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language
[PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna opened a new pull request, #14879: URL: https://github.com/apache/kafka/pull/14879 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
dajac commented on code in PR #14672: URL: https://github.com/apache/kafka/pull/14672#discussion_r1390251511 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -153,6 +154,13 @@ public int memberEpoch() { return memberEpoch; } +@Override +public ConsumerGroupMetadata groupMetadata() { +// TODO: what do we use here, epoch? Review Comment: We should return the member epoch. The generationId name is a bit annoying though. I am not sure if we can do something about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on PR #14672: URL: https://github.com/apache/kafka/pull/14672#issuecomment-1786396441 @philipnee Can you add the `ctr` tag, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue opened a new pull request, #14672: URL: https://github.com/apache/kafka/pull/14672 This implements the `Consumer.groupMetadata()` API by means of an event passed to and fulfilled in the consumer network I/O thread. The application thread will block until this event is processed in the background thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org