[ 
https://issues.apache.org/jira/browse/KAFKA-20516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Quah resolved KAFKA-20516.
-------------------------------
    Resolution: Invalid

> close(REMAIN_IN_GROUP) does not work for async consumer
> -------------------------------------------------------
>
>                 Key: KAFKA-20516
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20516
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>            Reporter: Sean Quah
>            Priority: Major
>
> TheĀ {{REMAIN_IN_GROUP}} option for {{Consumer.close}} is not respected for 
> the async consumer.
> Test:
> {code:java}
> public class PlaintextConsumerCloseTest {
>     ...
>     @ClusterTest
>     public void testClassicConsumerCloseWithRemainInGroup() throws Exception {
>         testCloseWithRemainInGroup(GroupProtocol.CLASSIC);
>     }
>     @ClusterTest
>     public void testAsyncConsumerCloseWithRemainInGroup() throws Exception {
>         testCloseWithRemainInGroup(GroupProtocol.CONSUMER);
>     }
>     private void testCloseWithRemainInGroup(GroupProtocol groupProtocol) 
> throws Exception {
>         var groupId = "group_test";
>         Map<String, Object> consumerConfig = Map.of(
>             GROUP_PROTOCOL_CONFIG, 
> groupProtocol.name().toLowerCase(Locale.ROOT),
>             KEY_DESERIALIZER_CLASS_CONFIG, 
> ByteArrayDeserializer.class.getName(),
>             VALUE_DESERIALIZER_CLASS_CONFIG, 
> ByteArrayDeserializer.class.getName(),
>             AUTO_OFFSET_RESET_CONFIG, "earliest",
>             GROUP_ID_CONFIG, groupId,
>             BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
>         );
>         var topicName = "close-with-remain-in-group";
>         var numRecords = 1;
>         cluster.createTopic(topicName, 2, (short) 2, Map.of());
>         sendRecords(cluster, new TopicPartition(topicName, 0), numRecords);
>         String memberId;
>         try (Consumer<byte[], byte[]> consumer = 
> cluster.consumer(consumerConfig)) {
>             consumer.subscribe(Set.of(topicName));
>             consumeRecords(consumer, numRecords);
>             memberId = consumer.groupMetadata().memberId();
>             consumer.close(CloseOptions.groupMembershipOperation(
>                 CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
>         }
>         try (Admin admin = cluster.admin()) {
>             var members = admin.describeConsumerGroups(List.of(groupId))
>                 .describedGroups()
>                 .get(groupId)
>                 .get()
>                 .members();
>             assertTrue(
>                 members.stream().anyMatch(m -> 
> memberId.equals(m.consumerId())),
>                 "Expected member " + memberId + " to still be in " + groupId
>                     + " after close(REMAIN_IN_GROUP), but members were " + 
> members);
>         }
>     }
> }
> {code}
> Test result:
> {code}
> Gradle Test Run :clients:clients-integration-tests:test > Gradle Test 
> Executor 2 > PlaintextConsumerCloseTest > 
> testAsyncConsumerCloseWithRemainInGroup() > 
> testAsyncConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated, 
> MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
>  FAILED
>     org.opentest4j.AssertionFailedError: Expected member 
> S9li7ioVTbW5IsD3C_WWzQ to still be in group_test after 
> close(REMAIN_IN_GROUP), but members were [] ==> expected: <true> but was: 
> <false>
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>         at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>         at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>         at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
>         at 
> app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:179)
>         at 
> app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testAsyncConsumerCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:145)
> Gradle Test Run :clients:clients-integration-tests:test > Gradle Test 
> Executor 2 > PlaintextConsumerCloseTest > 
> testClassicConsumerCloseWithRemainInGroup() > 
> testClassicConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated, 
> MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
>  PASSED
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to