AndrewJSchofield commented on code in PR #14879: URL: https://github.com/apache/kafka/pull/14879#discussion_r1412158807
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1365,7 +1482,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + Review Comment: Please use the `ConsumerConfigs` constant for the `"group.instance.id"`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -849,9 +953,22 @@ public OptionalLong currentLag(TopicPartition topicPartition) { } } + public void setGroupMetadata(final ConsumerGroupMetadata groupMetadata) { + this.groupMetadata = Optional.of(groupMetadata); + } + @Override public ConsumerGroupMetadata groupMetadata() { - throw new KafkaException("method not implemented"); + acquireAndEnsureOpen(); + try { + maybeThrowInvalidGroupIdException(); + backgroundEventProcessor.process(); + return groupMetadata.orElseThrow( + () -> new IllegalStateException("No group metadata found although a valid group ID exists. This is a bug!") Review Comment: You *know* that groupMetadata is present because of the earlier `maybeThrowInvalidGroupIdException`. I suppose one pattern would be to return an unwrapped `GroupMetadata` from maybeThrowInvalidGroupIdException so that you've eliminated the possibility of the `Optional` being empty. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -666,12 +766,16 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition } private void maybeThrowInvalidGroupIdException() { - if (!groupId.isPresent() || groupId.get().isEmpty()) { - throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + - "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + if (!groupMetadata.isPresent()) { + throwInInvalidGroupIdException(); } } + private void throwInInvalidGroupIdException() { Review Comment: I find the name `throwInInvalidGroupIdException` a bit strange. `throwInvalidGroupIdException` seems more usual. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java: ########## @@ -51,91 +61,91 @@ public void tearDown() { testBuilder.close(); } - @Test - public void testNoEvents() { - assertTrue(backgroundEventQueue.isEmpty()); - backgroundEventProcessor.process((event, error) -> { }); - assertTrue(backgroundEventQueue.isEmpty()); - } - - @Test - public void testSingleEvent() { - BackgroundEvent event = new ErrorBackgroundEvent(new RuntimeException("A")); - backgroundEventQueue.add(event); - assertPeeked(event); - backgroundEventProcessor.process((e, error) -> { }); - assertTrue(backgroundEventQueue.isEmpty()); - } - - @Test - public void testSingleErrorEvent() { - KafkaException error = new KafkaException("error"); - BackgroundEvent event = new ErrorBackgroundEvent(error); - backgroundEventHandler.add(new ErrorBackgroundEvent(error)); - assertPeeked(event); - assertProcessThrows(error); - } - - @Test - public void testMultipleEvents() { - BackgroundEvent event1 = new ErrorBackgroundEvent(new RuntimeException("A")); - backgroundEventQueue.add(event1); - backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); - backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); - - assertPeeked(event1); - backgroundEventProcessor.process((event, error) -> { }); - assertTrue(backgroundEventQueue.isEmpty()); - } - - @Test - public void testMultipleErrorEvents() { - Throwable error1 = new Throwable("error1"); - KafkaException error2 = new KafkaException("error2"); - KafkaException error3 = new KafkaException("error3"); - - backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); - backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); - backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); - - assertProcessThrows(new KafkaException(error1)); - } - - @Test - public void testMixedEventsWithErrorEvents() { - Throwable error1 = new Throwable("error1"); - KafkaException error2 = new KafkaException("error2"); - KafkaException error3 = new KafkaException("error3"); - - RuntimeException errorToCheck = new RuntimeException("A"); - backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck)); - backgroundEventHandler.add(new ErrorBackgroundEvent(error1)); - backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("B"))); - backgroundEventHandler.add(new ErrorBackgroundEvent(error2)); - backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("C"))); - backgroundEventHandler.add(new ErrorBackgroundEvent(error3)); - backgroundEventQueue.add(new ErrorBackgroundEvent(new RuntimeException("D"))); - - assertProcessThrows(new KafkaException(errorToCheck)); - } - - private void assertPeeked(BackgroundEvent event) { - BackgroundEvent peekEvent = backgroundEventQueue.peek(); - assertNotNull(peekEvent); - assertEquals(event, peekEvent); - } - - private void assertProcessThrows(Throwable error) { - assertFalse(backgroundEventQueue.isEmpty()); - - try { - backgroundEventProcessor.process(); - fail("Should have thrown error: " + error); - } catch (Throwable t) { - assertEquals(error.getClass(), t.getClass()); - assertEquals(error.getMessage(), t.getMessage()); - } - - assertTrue(backgroundEventQueue.isEmpty()); - } +// @Test Review Comment: I suppose the final PR will not have all of these commented lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org