kirktrue commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1411200070
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -398,6 +466,38 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { requestManagersSupplier); } + private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final ConsumerConfig config, + final GroupRebalanceConfig groupRebalanceConfig) { + final Optional<ConsumerGroupMetadata> groupMetadata = initializeGroupMetadata( + groupRebalanceConfig.groupId, + groupRebalanceConfig.groupInstanceId + ); + if (!groupMetadata.isPresent()) { + config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); + config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); + } + return groupMetadata; + } + + private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final String groupId, + final Optional<String> groupInstanceId) { + if (groupId != null) { + if (groupId.isEmpty()) { + throwInInvalidGroupIdException(); Review Comment: Why are we throwing this error here? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -131,9 +135,78 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private static final long NO_CURRENT_THREAD = -1L; + /** + * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the application thread for the purpose of processing Review Comment: Super nit: reflow the comments around whatever the line length standard is. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } + public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { + this.groupMetadata = Optional.of(groupMetadata); + } + @Override public ConsumerGroupMetadata groupMetadata() { - throw new KafkaException("method not implemented"); + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + backgroundEventProcessor.process(); Review Comment: I don't think we want to process _all_ the events here, as this might process events (like async commits, et al.) that are unrelated to group metadata. I have this same issue in another PR I'm working on, so I don't have a solution yet 😞 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } + public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { Review Comment: This is just for testing, right? Can we remove the `public` qualifier? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } + public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { + this.groupMetadata = Optional.of(groupMetadata); + } + @Override public ConsumerGroupMetadata groupMetadata() { - throw new KafkaException("method not implemented"); + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + backgroundEventProcessor.process(); + return groupMetadata.orElseThrow( + () -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: I'm not sure what we should throw here. Does the API contract allow for `IllegalStateException`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdatedEvent.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; + +import java.util.Objects; + +/** + * This event is sent by the {@link ConsumerNetworkThread consumer's network thread} to the application thread + * so that when the user calls the {@link Consumer#groupMetadata()} API, the information is up-to-date. The + * information for the current state of the group member is managed on the consumer network thread and thus + * requires this interplay between threads. + */ +public class GroupMetadataUpdatedEvent extends BackgroundEvent { + + private final ConsumerGroupMetadata groupMetadata; + + public GroupMetadataUpdatedEvent(ConsumerGroupMetadata groupMetadata) { + super(Type.GROUP_METADATA_UPDATED); + this.groupMetadata = Objects.requireNonNull(groupMetadata, "ConsumerGroupMetadata must be non-null"); + } + + public ConsumerGroupMetadata groupMetadata() { + return groupMetadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + GroupMetadataUpdatedEvent that = (GroupMetadataUpdatedEvent) o; + + return groupMetadata.equals(that.groupMetadata); Review Comment: I just double-checked that `ConsumerGroupMetadata` has valid `hashCode()` and `equals()` methods defined, and it does, so we're 👍 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java: ########## @@ -49,4 +49,9 @@ public void add(BackgroundEvent event) { log.trace("Enqueued event: {}", event); backgroundEventQueue.add(event); } + + // Visible for testing + public Queue<BackgroundEvent> backgroundEventQueue() { + return backgroundEventQueue; + } Review Comment: I'm not terribly crazy about this approach as it can lead to circumvention of the logic. I understand it's just for tests, for which we do have the `ConsumerTestBuilder` approach which allows tests to get access to these kinds of objects directly. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + - groupInstanceId.orElse("null")); + groupMetadata.orElseThrow( + () -> new IllegalStateException("No group metadata found although a group ID was provided. This is a bug!") + ).groupInstanceId().orElseThrow( + () -> new IllegalStateException("No group instance ID found although the consumer is fenced. This is a bug!") + )); Review Comment: Can we restructure this so that our exception throwing case doesn't have nested exception throwing cases? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } + public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { + this.groupMetadata = Optional.of(groupMetadata); + } + @Override public ConsumerGroupMetadata groupMetadata() { - throw new KafkaException("method not implemented"); + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + backgroundEventProcessor.process(); Review Comment: I don't think we want to process _all_ the events here, as this might process events (like async commits, et al.) that are unrelated to group metadata. I have this same issue in another PR I'm working on, so I don't have a solution yet 😞 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdatedEvent.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; + +import java.util.Objects; + +/** + * This event is sent by the {@link ConsumerNetworkThread consumer's network thread} to the application thread + * so that when the user calls the {@link Consumer#groupMetadata()} API, the information is up-to-date. The + * information for the current state of the group member is managed on the consumer network thread and thus + * requires this interplay between threads. + */ +public class GroupMetadataUpdatedEvent extends BackgroundEvent { + + private final ConsumerGroupMetadata groupMetadata; + + public GroupMetadataUpdatedEvent(ConsumerGroupMetadata groupMetadata) { + super(Type.GROUP_METADATA_UPDATED); + this.groupMetadata = Objects.requireNonNull(groupMetadata, "ConsumerGroupMetadata must be non-null"); + } + + public ConsumerGroupMetadata groupMetadata() { + return groupMetadata; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + + GroupMetadataUpdatedEvent that = (GroupMetadataUpdatedEvent) o; + + return groupMetadata.equals(that.groupMetadata); Review Comment: I just double-checked that `ConsumerGroupMetadata` has valid `hashCode()` and `equals()` methods defined, and it does, so we're 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org