Sean Quah created KAFKA-20516:
---------------------------------

             Summary: close(REMAIN_IN_GROUP) does not work for async consumer
                 Key: KAFKA-20516
                 URL: https://issues.apache.org/jira/browse/KAFKA-20516
             Project: Kafka
          Issue Type: Bug
          Components: clients
            Reporter: Sean Quah


TheĀ {{REMAIN_IN_GROUP}} option for {{Consumer.close}} is not respected for the 
async consumer.

Test:
{code:java}
public class PlaintextConsumerCloseTest {
    ...

    @ClusterTest
    public void testClassicConsumerCloseWithRemainInGroup() throws Exception {
        testCloseWithRemainInGroup(GroupProtocol.CLASSIC);
    }

    @ClusterTest
    public void testAsyncConsumerCloseWithRemainInGroup() throws Exception {
        testCloseWithRemainInGroup(GroupProtocol.CONSUMER);
    }

    private void testCloseWithRemainInGroup(GroupProtocol groupProtocol) throws 
Exception {
        var groupId = "group_test";
        Map<String, Object> consumerConfig = Map.of(
            GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
            KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
            VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName(),
            AUTO_OFFSET_RESET_CONFIG, "earliest",
            GROUP_ID_CONFIG, groupId,
            BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
        );
        var topicName = "close-with-remain-in-group";
        var numRecords = 1;
        cluster.createTopic(topicName, 2, (short) 2, Map.of());

        sendRecords(cluster, new TopicPartition(topicName, 0), numRecords);

        String memberId;
        try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
            consumer.subscribe(Set.of(topicName));
            consumeRecords(consumer, numRecords);
            memberId = consumer.groupMetadata().memberId();
            consumer.close(CloseOptions.groupMembershipOperation(
                CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
        }

        try (Admin admin = cluster.admin()) {
            var members = admin.describeConsumerGroups(List.of(groupId))
                .describedGroups()
                .get(groupId)
                .get()
                .members();
            assertTrue(
                members.stream().anyMatch(m -> memberId.equals(m.consumerId())),
                "Expected member " + memberId + " to still be in " + groupId
                    + " after close(REMAIN_IN_GROUP), but members were " + 
members);
        }
    }
}
{code}

Test result:
{code}
Gradle Test Run :clients:clients-integration-tests:test > Gradle Test Executor 
2 > PlaintextConsumerCloseTest > testAsyncConsumerCloseWithRemainInGroup() > 
testAsyncConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated, 
MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
 FAILED
    org.opentest4j.AssertionFailedError: Expected member S9li7ioVTbW5IsD3C_WWzQ 
to still be in group_test after close(REMAIN_IN_GROUP), but members were [] ==> 
expected: <true> but was: <false>
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
        at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
        at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
        at 
app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:179)
        at 
app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testAsyncConsumerCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:145)

Gradle Test Run :clients:clients-integration-tests:test > Gradle Test Executor 
2 > PlaintextConsumerCloseTest > testClassicConsumerCloseWithRemainInGroup() > 
testClassicConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated, 
MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
 PASSED
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to