Sean Quah created KAFKA-20516:
---------------------------------
Summary: close(REMAIN_IN_GROUP) does not work for async consumer
Key: KAFKA-20516
URL: https://issues.apache.org/jira/browse/KAFKA-20516
Project: Kafka
Issue Type: Bug
Components: clients
Reporter: Sean Quah
TheĀ {{REMAIN_IN_GROUP}} option for {{Consumer.close}} is not respected for the
async consumer.
Test:
{code:java}
public class PlaintextConsumerCloseTest {
...
@ClusterTest
public void testClassicConsumerCloseWithRemainInGroup() throws Exception {
testCloseWithRemainInGroup(GroupProtocol.CLASSIC);
}
@ClusterTest
public void testAsyncConsumerCloseWithRemainInGroup() throws Exception {
testCloseWithRemainInGroup(GroupProtocol.CONSUMER);
}
private void testCloseWithRemainInGroup(GroupProtocol groupProtocol) throws
Exception {
var groupId = "group_test";
Map<String, Object> consumerConfig = Map.of(
GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT),
KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
AUTO_OFFSET_RESET_CONFIG, "earliest",
GROUP_ID_CONFIG, groupId,
BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
);
var topicName = "close-with-remain-in-group";
var numRecords = 1;
cluster.createTopic(topicName, 2, (short) 2, Map.of());
sendRecords(cluster, new TopicPartition(topicName, 0), numRecords);
String memberId;
try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
consumer.subscribe(Set.of(topicName));
consumeRecords(consumer, numRecords);
memberId = consumer.groupMetadata().memberId();
consumer.close(CloseOptions.groupMembershipOperation(
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
}
try (Admin admin = cluster.admin()) {
var members = admin.describeConsumerGroups(List.of(groupId))
.describedGroups()
.get(groupId)
.get()
.members();
assertTrue(
members.stream().anyMatch(m -> memberId.equals(m.consumerId())),
"Expected member " + memberId + " to still be in " + groupId
+ " after close(REMAIN_IN_GROUP), but members were " +
members);
}
}
}
{code}
Test result:
{code}
Gradle Test Run :clients:clients-integration-tests:test > Gradle Test Executor
2 > PlaintextConsumerCloseTest > testAsyncConsumerCloseWithRemainInGroup() >
testAsyncConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated,
MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
FAILED
org.opentest4j.AssertionFailedError: Expected member S9li7ioVTbW5IsD3C_WWzQ
to still be in group_test after close(REMAIN_IN_GROUP), but members were [] ==>
expected: <true> but was: <false>
at
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at
app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:179)
at
app//org.apache.kafka.clients.consumer.PlaintextConsumerCloseTest.testAsyncConsumerCloseWithRemainInGroup(PlaintextConsumerCloseTest.java:145)
Gradle Test Run :clients:clients-integration-tests:test > Gradle Test Executor
2 > PlaintextConsumerCloseTest > testClassicConsumerCloseWithRemainInGroup() >
testClassicConsumerCloseWithRemainInGroup [1] Type=Raft-Isolated,
MetadataVersion=4.4-IV0,BrokerSecurityProtocol=PLAINTEXT,BrokerListenerName=ListenerName(EXTERNAL),ControllerSecurityProtocol=PLAINTEXT,ControllerListenerName=ListenerName(CONTROLLER)
PASSED
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)