lucasbru commented on code in PR #14779: URL: https://github.com/apache/kafka/pull/14779#discussion_r1395778024
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1102,46 +1207,94 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { subscribeInternal(pattern, Optional.of(listener)); } + /** Review Comment: Mostly copied from the original consumer ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -16,6 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals; Review Comment: Review this file without white space changes. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -112,6 +111,12 @@ public void testInvalidGroupId() { assertThrows(InvalidGroupIdException.class, () -> consumer.committed(new HashSet<>())); } + @Test + public void testFailOnClosedConsumer() { Review Comment: Testing the concurrent modification exception requires a bit of tooling and isn't easy to test in a unit test. It's not tested for the original consumer, so I didn't add a test here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org