lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1426388734


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -810,89 +854,87 @@ public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceId
         final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
         props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
         final ConsumerConfig config = new ConsumerConfig(props);
-        try (final AsyncKafkaConsumer<String, String> consumer =
-            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
-
-            final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
-
-            assertEquals(groupId, groupMetadata.groupId());
-            assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
-            assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
-            assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
-        }
+        final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer());
+        final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
+        assertEquals(groupId, groupMetadata.groupId());
+        assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+        assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+        assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+        consumer.close(Duration.ZERO);
     }
 
     @Test
     public void testGroupMetadataUpdateSingleCall() {
         final String groupId = "consumerGroupA";
         final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
         final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
-        try (final AsyncKafkaConsumer<String, String> consumer =

Review Comment:
   I didn't mean to revert the changes, but discard them when we merge with the 
asyncKafkaConsumer refactoring. Is it possible these changes were required to 
not run OOM?
   
   One option would be to rebase this PR on the refactoring, which can possibly 
resolve these OOMs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to