kirktrue commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1411200070


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -398,6 +466,38 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 requestManagersSupplier);
     }
 
+    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final 
ConsumerConfig config,
+                                                                    final 
GroupRebalanceConfig groupRebalanceConfig) {
+        final Optional<ConsumerGroupMetadata> groupMetadata = 
initializeGroupMetadata(
+            groupRebalanceConfig.groupId,
+            groupRebalanceConfig.groupInstanceId
+        );
+        if (!groupMetadata.isPresent()) {
+            config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+            config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+        }
+        return groupMetadata;
+    }
+
+    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final 
String groupId,
+                                                                    final 
Optional<String> groupInstanceId) {
+        if (groupId != null) {
+            if (groupId.isEmpty()) {
+                throwInInvalidGroupIdException();

Review Comment:
   Why are we throwing this error here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -131,9 +135,78 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the application thread for the purpose of processing

Review Comment:
   Super nit: reflow the comments around whatever the line length standard is.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
         }
     }
 
+    public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+        this.groupMetadata = Optional.of(groupMetadata);
+    }
+
     @Override
     public ConsumerGroupMetadata groupMetadata() {
-        throw new KafkaException("method not implemented");
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            backgroundEventProcessor.process();

Review Comment:
   I don't think we want to process _all_ the events here, as this might 
process events (like async commits, et al.) that are unrelated to group 
metadata.
   
   I have this same issue in another PR I'm working on, so I don't have a 
solution yet 😞 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
         }
     }
 
+    public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {

Review Comment:
   This is just for testing, right? Can we remove the `public` qualifier?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
         }
     }
 
+    public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+        this.groupMetadata = Optional.of(groupMetadata);
+    }
+
     @Override
     public ConsumerGroupMetadata groupMetadata() {
-        throw new KafkaException("method not implemented");
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            backgroundEventProcessor.process();
+            return groupMetadata.orElseThrow(
+                () -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   I'm not sure what we should throw here. Does the API contract allow for 
`IllegalStateException`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdatedEvent.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.Objects;
+
+/**
+ * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
+ * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
+ * information for the current state of the group member is managed on the 
consumer network thread and thus
+ * requires this interplay between threads.
+ */
+public class GroupMetadataUpdatedEvent extends BackgroundEvent {
+
+  private final ConsumerGroupMetadata groupMetadata;
+
+  public GroupMetadataUpdatedEvent(ConsumerGroupMetadata groupMetadata) {
+    super(Type.GROUP_METADATA_UPDATED);
+    this.groupMetadata = Objects.requireNonNull(groupMetadata, 
"ConsumerGroupMetadata must be non-null");
+  }
+
+  public ConsumerGroupMetadata groupMetadata() {
+    return groupMetadata;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+
+    GroupMetadataUpdatedEvent that = (GroupMetadataUpdatedEvent) o;
+
+    return groupMetadata.equals(that.groupMetadata);

Review Comment:
   I just double-checked that `ConsumerGroupMetadata` has valid `hashCode()` 
and `equals()` methods defined, and it does, so we're 👍 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java:
##########
@@ -49,4 +49,9 @@ public void add(BackgroundEvent event) {
         log.trace("Enqueued event: {}", event);
         backgroundEventQueue.add(event);
     }
+
+    // Visible for testing
+    public Queue<BackgroundEvent> backgroundEventQueue() {
+        return backgroundEventQueue;
+    }

Review Comment:
   I'm not terribly crazy about this approach as it can lead to circumvention 
of the logic. I understand it's just for tests, for which we do have the 
`ConsumerTestBuilder` approach which allows tests to get access to these kinds 
of objects directly.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
     private void maybeThrowFencedInstanceException() {
         if (isFenced) {
             throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
-                groupInstanceId.orElse("null"));
+                groupMetadata.orElseThrow(
+                    () -> new IllegalStateException("No group metadata found 
although a group ID was provided. This is a bug!")
+                ).groupInstanceId().orElseThrow(
+                    () -> new IllegalStateException("No group instance ID 
found although the consumer is fenced. This is a bug!")
+                ));

Review Comment:
   Can we restructure this so that our exception throwing case doesn't have 
nested exception throwing cases?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
         }
     }
 
+    public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+        this.groupMetadata = Optional.of(groupMetadata);
+    }
+
     @Override
     public ConsumerGroupMetadata groupMetadata() {
-        throw new KafkaException("method not implemented");
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            backgroundEventProcessor.process();

Review Comment:
   I don't think we want to process _all_ the events here, as this might 
process events (like async commits, et al.) that are unrelated to group 
metadata.
   
   I have this same issue in another PR I'm working on, so I don't have a 
solution yet 😞 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdatedEvent.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.Objects;
+
+/**
+ * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
+ * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
+ * information for the current state of the group member is managed on the 
consumer network thread and thus
+ * requires this interplay between threads.
+ */
+public class GroupMetadataUpdatedEvent extends BackgroundEvent {
+
+  private final ConsumerGroupMetadata groupMetadata;
+
+  public GroupMetadataUpdatedEvent(ConsumerGroupMetadata groupMetadata) {
+    super(Type.GROUP_METADATA_UPDATED);
+    this.groupMetadata = Objects.requireNonNull(groupMetadata, 
"ConsumerGroupMetadata must be non-null");
+  }
+
+  public ConsumerGroupMetadata groupMetadata() {
+    return groupMetadata;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+
+    GroupMetadataUpdatedEvent that = (GroupMetadataUpdatedEvent) o;
+
+    return groupMetadata.equals(that.groupMetadata);

Review Comment:
   I just double-checked that `ConsumerGroupMetadata` has valid `hashCode()` 
and `equals()` methods defined, and it does, so we're 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to