lucasbru commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410466885


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -398,6 +387,38 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 requestManagersSupplier);
     }
 
+    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final 
ConsumerConfig config,
+                                                                    final 
GroupRebalanceConfig groupRebalanceConfig) {
+        final Optional<ConsumerGroupMetadata> groupMetadata = 
initializeGroupMetadata(
+            groupRebalanceConfig.groupId,
+            groupRebalanceConfig.groupInstanceId
+        );
+        if (!groupMetadata.isPresent()) {
+            config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+            config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+        }
+        return groupMetadata;
+    }
+
+    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(final 
String groupId,
+                                                                    final 
Optional<String> groupInstanceId) {
+        if (groupId != null) {
+            if (groupId.isEmpty()) {
+                throwInInvalidGroupIdException();
+                return Optional.empty();

Review Comment:
   Unreachable!



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
         assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
     }
 
+    @Test
+    public void testGroupIdIsNull() {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
+        props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+        final ConsumerConfig config = new ConsumerConfig(props);
+
+        try (AsyncKafkaConsumer<String, String> consumer =
+                 new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+            
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+            
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+        } catch (final Exception exception) {
+            throw new AssertionFailedError("The following exception was not 
expected:", exception);
+        }
+    }
+
+    @Test
+    public void testGroupIdIsNotNullAndValid() {

Review Comment:
   I'd personally drop the `Is`, to be consistent with the tests below. Also 
above.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -672,12 +693,16 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
     }
 
     private void maybeThrowInvalidGroupIdException() {
-        if (!groupId.isPresent() || groupId.get().isEmpty()) {
-            throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
-                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in 
the consumer configuration.");
+        if (!groupMetadata.isPresent()) {
+            throwInInvalidGroupIdException();
         }
     }
 
+    private void throwInInvalidGroupIdException() {
+        throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +

Review Comment:
   Kirks' PR gives nicely specific error messages (present, but cannot be 
empty). I would just inline this. Especially because this is something that 
people will run into during migration (and it was allowed before), that would 
be nice to have.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
         assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
     }
 
+    @Test
+    public void testGroupIdIsNull() {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
+        props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+        final ConsumerConfig config = new ConsumerConfig(props);
+
+        try (AsyncKafkaConsumer<String, String> consumer =
+                 new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+            
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+            
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+        } catch (final Exception exception) {
+            throw new AssertionFailedError("The following exception was not 
expected:", exception);
+        }
+    }
+
+    @Test
+    public void testGroupIdIsNotNullAndValid() {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
+        props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+        final ConsumerConfig config = new ConsumerConfig(props);
+
+        try (AsyncKafkaConsumer<String, String> consumer =
+                 new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+            
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+            
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+        } catch (final Exception exception) {
+            throw new AssertionFailedError("The following exception was not 
expected:", exception);
+        }
+    }
+
+    @Test
+    public void testGroupIdEmpty() {
+        testGroupId("");
+    }
+
+    @Test
+    public void testGroupIdOnlyWhitespaces() {
+        testGroupId("       ");
+    }
+
+    private void testGroupId(final String groupId) {

Review Comment:
   `testInvalidGroupId`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1370,7 +1395,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
     private void maybeThrowFencedInstanceException() {
         if (isFenced) {
             throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
-                groupInstanceId.orElse("null"));
+                groupMetadata.orElseThrow(
+                    () -> new IllegalStateException("No group metadata found 
although a group ID was provided. This is a bug!")

Review Comment:
   Is it important to crash here? Otherwise I'd consider just logging and error 
and returning.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -672,12 +693,16 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
     }
 
     private void maybeThrowInvalidGroupIdException() {
-        if (!groupId.isPresent() || groupId.get().isEmpty()) {
-            throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
-                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in 
the consumer configuration.");
+        if (!groupMetadata.isPresent()) {
+            throwInInvalidGroupIdException();

Review Comment:
   Why "InInvalid" group? Drop the `In`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to