AndrewJSchofield commented on code in PR #14879:
URL: https://github.com/apache/kafka/pull/14879#discussion_r1412158807


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
     private void maybeThrowFencedInstanceException() {
         if (isFenced) {
             throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +

Review Comment:
   Please use the `ConsumerConfigs` constant for the `"group.instance.id"`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition 
topicPartition) {
         }
     }
 
+    public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) {
+        this.groupMetadata = Optional.of(groupMetadata);
+    }
+
     @Override
     public ConsumerGroupMetadata groupMetadata() {
-        throw new KafkaException("method not implemented");
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            backgroundEventProcessor.process();
+            return groupMetadata.orElseThrow(
+                () -> new IllegalStateException("No group metadata found 
although a valid group ID exists. This is a bug!")

Review Comment:
   You *know* that groupMetadata is present because of the earlier 
`maybeThrowInvalidGroupIdException`. I suppose one pattern would be to return 
an unwrapped `GroupMetadata` from maybeThrowInvalidGroupIdException so that 
you've eliminated the possibility of the `Optional` being empty.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -666,12 +766,16 @@ public Map<TopicPartition, OffsetAndMetadata> 
committed(final Set<TopicPartition
     }
 
     private void maybeThrowInvalidGroupIdException() {
-        if (!groupId.isPresent() || groupId.get().isEmpty()) {
-            throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
-                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in 
the consumer configuration.");
+        if (!groupMetadata.isPresent()) {
+            throwInInvalidGroupIdException();
         }
     }
 
+    private void throwInInvalidGroupIdException() {

Review Comment:
   I find the name `throwInInvalidGroupIdException` a bit strange. 
`throwInvalidGroupIdException` seems more usual.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java:
##########
@@ -51,91 +61,91 @@ public void tearDown() {
             testBuilder.close();
     }
 
-    @Test
-    public void testNoEvents() {
-        assertTrue(backgroundEventQueue.isEmpty());
-        backgroundEventProcessor.process((event, error) -> { });
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-
-    @Test
-    public void testSingleEvent() {
-        BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-        backgroundEventQueue.add(event);
-        assertPeeked(event);
-        backgroundEventProcessor.process((e, error) -> { });
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-
-    @Test
-    public void testSingleErrorEvent() {
-        KafkaException error = new KafkaException("error");
-        BackgroundEvent event = new ErrorBackgroundEvent(error);
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error));
-        assertPeeked(event);
-        assertProcessThrows(error);
-    }
-
-    @Test
-    public void testMultipleEvents() {
-        BackgroundEvent event1 = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-        backgroundEventQueue.add(event1);
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-
-        assertPeeked(event1);
-        backgroundEventProcessor.process((event, error) -> { });
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-
-    @Test
-    public void testMultipleErrorEvents() {
-        Throwable error1 = new Throwable("error1");
-        KafkaException error2 = new KafkaException("error2");
-        KafkaException error3 = new KafkaException("error3");
-
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
-        assertProcessThrows(new KafkaException(error1));
-    }
-
-    @Test
-    public void testMixedEventsWithErrorEvents() {
-        Throwable error1 = new Throwable("error1");
-        KafkaException error2 = new KafkaException("error2");
-        KafkaException error3 = new KafkaException("error3");
-
-        RuntimeException errorToCheck = new RuntimeException("A");
-        backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("D")));
-
-        assertProcessThrows(new KafkaException(errorToCheck));
-    }
-
-    private void assertPeeked(BackgroundEvent event) {
-        BackgroundEvent peekEvent = backgroundEventQueue.peek();
-        assertNotNull(peekEvent);
-        assertEquals(event, peekEvent);
-    }
-
-    private void assertProcessThrows(Throwable error) {
-        assertFalse(backgroundEventQueue.isEmpty());
-
-        try {
-            backgroundEventProcessor.process();
-            fail("Should have thrown error: " + error);
-        } catch (Throwable t) {
-            assertEquals(error.getClass(), t.getClass());
-            assertEquals(error.getMessage(), t.getMessage());
-        }
-
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
+//    @Test

Review Comment:
   I suppose the final PR will not have all of these commented lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to