[
https://issues.apache.org/jira/browse/KAFKA-20516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Quah reopened KAFKA-20516:
-------------------------------
> close(REMAIN_IN_GROUP) does not work for async consumer
> -------------------------------------------------------
>
> Key: KAFKA-20516
> URL: https://issues.apache.org/jira/browse/KAFKA-20516
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Reporter: Sean Quah
> Priority: Major
>
> TheĀ {{REMAIN_IN_GROUP}} option for {{Consumer.close}} is not respected for
> the async consumer.
> Test:
> {code:java}
> public class PlaintextConsumerCloseTest {
> ...
> @ClusterTest
> public void testClassicConsumerCloseWithRemainInGroup() throws Exception {
> testCloseWithRemainInGroup(GroupProtocol.CLASSIC);
> }
> @ClusterTest
> public void testAsyncConsumerCloseWithRemainInGroup() throws Exception {
> testCloseWithRemainInGroup(GroupProtocol.CONSUMER);
> }
> private void testCloseWithRemainInGroup(GroupProtocol groupProtocol)
> throws Exception {
> var groupId = "group_test";
> Map<String, Object> consumerConfig = Map.of(
> GROUP_PROTOCOL_CONFIG,
> groupProtocol.name().toLowerCase(Locale.ROOT),
> KEY_DESERIALIZER_CLASS_CONFIG,
> ByteArrayDeserializer.class.getName(),
> VALUE_DESERIALIZER_CLASS_CONFIG,
> ByteArrayDeserializer.class.getName(),
> AUTO_OFFSET_RESET_CONFIG, "earliest",
> GROUP_ID_CONFIG, groupId,
> BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
> );
> var topicName = "close-with-remain-in-group";
> var numRecords = 1;
> cluster.createTopic(topicName, 2, (short) 2, Map.of());
> sendRecords(cluster, new TopicPartition(topicName, 0), numRecords);
> String memberId;
> try (Consumer<byte[], byte[]> consumer =
> cluster.consumer(consumerConfig)) {
> consumer.subscribe(Set.of(topicName));
> consumeRecords(consumer, numRecords);
> memberId = consumer.groupMetadata().memberId();
> consumer.close(CloseOptions.groupMembershipOperation(
> CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
> }
> try (Admin admin = cluster.admin()) {
> var members = admin.describeConsumerGroups(List.of(groupId))
> .describedGroups()
> .get(groupId)
> .get()
> .members();
> assertTrue(
> members.stream().anyMatch(m ->
> memberId.equals(m.consumerId())),
> "Expected member " + memberId + " to still be in " + groupId
> + " after close(REMAIN_IN_GROUP), but members were " +
> members);
> }
> }
> }
> {code}
> Test result:
> {code}
> Gradle Test Run :clients:clients-integration-tests:test > Gradle Test
> Executor 2 > PlaintextConsumerCloseTest >
> testAsyncConsumerCloseWithRemainInGroup() >
> testAsyncConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated,
> MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
> FAILED
> org.opentest4j.AssertionFailedError: Expected member
> S9li7ioVTbW5IsD3C_WWzQ to still be in group_test after
> close(REMAIN_IN_GROUP), but members were [] ==> expected: <true> but was:
> <false>
> at
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> at
> app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:179)
> at
> app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testAsyncConsumerCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:145)
> Gradle Test Run :clients:clients-integration-tests:test > Gradle Test
> Executor 2 > PlaintextConsumerCloseTest >
> testClassicConsumerCloseWithRemainInGroup() >
> testClassicConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated,
> MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
> PASSED
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)