Man Hin created KAFKA-9968:
------------------------------
Summary: Newly subscribed topic not present in metadata request
Key: KAFKA-9968
URL: https://issues.apache.org/jira/browse/KAFKA-9968
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.4.1, 2.5.0
Reporter: Man Hin
Our application subscribes to multiple topics one by one. It uses to work fine.
But after we have upgraded our Kafka client version from 2.4.0 and 2.4.1, our
application failed to receive messages for the last topic any more.
I spotted a warning log from Kafka client.
{code:java}
2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator -
[Consumer clientId=sample_consumer, groupId=sample_client] The following
subscribed topics are not assigned to any members: [TopicX] {code}
I'm able to reproduce it with a test case running against a live Kafka broker
(we are using v2.4.1 broker).
{code:java}
@Test
public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws
InterruptedException, ExecutionException, TimeoutException {
// WHEN
List<String> topics = new ArrayList<>();
topics.add(TOPIC_C);
consumer.subscribe(topics);
consumer.poll(0);
topics.add(TOPIC_B);
consumer.subscribe(topics);
consumer.poll(0);
topics.add(TOPIC_A);
consumer.subscribe(topics);
consumer.poll(0);
// THEN
Set<TopicPartition> assignments = consumer.assignment();
Set<String> topicSet = assignments.stream().map(p ->
p.topic()).distinct().collect(Collectors.toSet());
logger.info("Topic: {}", topicSet);
assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));
} {code}
We turned on trace log and found that the metadata requests always missed the
last topic we subscribed.
{code:java}
2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer
clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): TopicC
2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer
clientId=sample_consumer, groupId=sample_client] Sending metadata request
MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')],
allowAutoTopicCreation=true, includeClusterA
2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator -
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000,
rebalanceTimeoutMs=300000, memberId=
2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator -
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000,
rebalanceTimeoutMs=300000, memberId=
2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer
clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s):
TopicC, TopicB
2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer
clientId=sample_consumer, groupId=sample_client] Sending metadata request
MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')],
allowAutoTopicCreation=true, includeClusterA
2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator -
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000,
rebalanceTimeoutMs=300000, memberId=
2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer
clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s):
TopicC, TopicB, TopicA
2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer
clientId=sample_consumer, groupId=sample_client] Sending metadata request
MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'),
MetadataRequestTopic(name='TopicC')], allowAu
2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator -
[Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup
(JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000,
rebalanceTimeoutMs=300000, memberId=
{code}
I suspect this is because SubscriptionState.groupSubscription contains only
topics as returned by joinGroup response since 2.4.1. As such the newly
subscribed topic is missed out from the metadata request.
The behaviour before 2.4.0 was to add topics in joinGroup response to
groupSubscription but changed to replace in 2.4.1. Maybe this is the cause. See
SubscriptionState in
[https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c.|https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c]
I tried client v2.5.0 and got the same result.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)