[ https://issues.apache.org/jira/browse/KAFKA-9968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105857#comment-17105857 ]
Man Hin commented on KAFKA-9968: -------------------------------- Thanks for the suggestions! I tried the test cases with extra poll. This doesn't seem to work. But issuing consumer.unsubscribe() before subscribe() works. Do you regard it as a workaround or the correct usage of the API? > Newly subscribed topic not present in metadata request > ------------------------------------------------------ > > Key: KAFKA-9968 > URL: https://issues.apache.org/jira/browse/KAFKA-9968 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.5.0, 2.4.1 > Reporter: Man Hin > Priority: Major > Attachments: KafkaClientVersionTest.java > > > Our application subscribes to multiple topics one by one. It uses to work > fine. But after we have upgraded our Kafka client version from 2.4.0 and > 2.4.1, our application failed to receive messages for the last topic any more. > I spotted a warning log from Kafka client. > {code:java} > 2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - > [Consumer clientId=sample_consumer, groupId=sample_client] The following > subscribed topics are not assigned to any members: [TopicX] {code} > I'm able to reproduce it with a test case running against a live Kafka broker > (we are using v2.4.1 broker). > {code:java} > @Test > public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws > InterruptedException, ExecutionException, TimeoutException { > // WHEN > List<String> topics = new ArrayList<>(); > topics.add(TOPIC_C); > consumer.subscribe(topics); > consumer.poll(0); > topics.add(TOPIC_B); > consumer.subscribe(topics); > consumer.poll(0); > topics.add(TOPIC_A); > consumer.subscribe(topics); > consumer.poll(0); > // THEN > Set<TopicPartition> assignments = consumer.assignment(); > Set<String> topicSet = assignments.stream().map(p -> > p.topic()).distinct().collect(Collectors.toSet()); > logger.info("Topic: {}", topicSet); > assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A)); > } {code} > We turned on trace log and found that the metadata requests always missed the > last topic we subscribed. > {code:java} > 2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer > clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): > TopicC > 2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer > clientId=sample_consumer, groupId=sample_client] Sending metadata request > MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], > allowAutoTopicCreation=true, includeClusterA > 2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - > [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup > (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, > rebalanceTimeoutMs=300000, memberId= > 2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - > [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup > (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, > rebalanceTimeoutMs=300000, memberId= > 2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer > clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): > TopicC, TopicB > 2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer > clientId=sample_consumer, groupId=sample_client] Sending metadata request > MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], > allowAutoTopicCreation=true, includeClusterA > 2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - > [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup > (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, > rebalanceTimeoutMs=300000, memberId= > 2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer > clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): > TopicC, TopicB, TopicA > 2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer > clientId=sample_consumer, groupId=sample_client] Sending metadata request > MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), > MetadataRequestTopic(name='TopicC')], allowAu > 2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - > [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup > (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=10000, > rebalanceTimeoutMs=300000, memberId= > {code} > I suspect this is because SubscriptionState.groupSubscription contains only > topics as returned by joinGroup response since 2.4.1. As such the newly > subscribed topic is missed out from the metadata request. > The behaviour before 2.4.0 was to add topics in joinGroup response to > groupSubscription but changed to replace in 2.4.1. Maybe this is the cause. > See SubscriptionState in > [https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c.|https://github.com/apache/kafka/commit/0a5dec0b3a3e1b027230ba766fee0c08b70cc63c] > I tried client v2.5.0 and got the same result. -- This message was sent by Atlassian Jira (v8.3.4#803005)