[ https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568531#comment-16568531 ]
ASF GitHub Bot commented on KAFKA-6648: --------------------------------------- lindong28 closed pull request #4679: KAFKA-6648 - Fetcher.getTopicMetadata() should return all partitions for each requested topic URL: https://github.com/apache/kafka/pull/4679 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ca8e0d26c81..89ea95dab4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -313,7 +313,7 @@ else if (error.exception() instanceof RetriableException) if (!shouldRetry) { HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>(); for (String topic : cluster.topics()) - topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic)); + topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic)); return topicsPartitionInfos; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 9164daa426c..af50b5272de 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -84,6 +84,7 @@ import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1388,6 +1389,51 @@ public void testGetTopicMetadataLeaderNotAvailable() { assertTrue(topicMetadata.containsKey(topicName)); } + @Test + public void testGetTopicMetadataOfflinePartitions() { + MetadataResponse originalResponse = newMetadataResponse(topicName, Errors.NONE); //baseline ok response + + //create a response based on the above one with all partitions being leaderless + List<MetadataResponse.TopicMetadata> altTopics = new ArrayList<>(); + for (MetadataResponse.TopicMetadata item : originalResponse.topicMetadata()) { + List<MetadataResponse.PartitionMetadata> partitions = item.partitionMetadata(); + List<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<>(); + for (MetadataResponse.PartitionMetadata p : partitions) { + altPartitions.add(new MetadataResponse.PartitionMetadata( + p.error(), + p.partition(), + null, //no leader + p.replicas(), + p.isr(), + p.offlineReplicas()) + ); + } + MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata( + item.error(), + item.topic(), + item.isInternal(), + altPartitions + ); + altTopics.add(alteredTopic); + } + Node controller = originalResponse.controller(); + MetadataResponse altered = new MetadataResponse( + (List<Node>) originalResponse.brokers(), + originalResponse.clusterId(), + controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID, + altTopics); + + client.prepareResponse(altered); + + Map<String, List<PartitionInfo>> topicMetadata = + fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topicName), false), 5000L); + + Assert.assertNotNull(topicMetadata); + Assert.assertNotNull(topicMetadata.get(topicName)); + //noinspection ConstantConditions + Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size()); + } + /* * Send multiple requests. Verify that the client side quota metrics have the right values */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fetcher.getTopicMetadata() should return all partitions for each requested > topic > -------------------------------------------------------------------------------- > > Key: KAFKA-6648 > URL: https://issues.apache.org/jira/browse/KAFKA-6648 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.11.0.2, 1.0.0 > Reporter: radai rosenblatt > Assignee: radai rosenblatt > Priority: Major > Fix For: 2.1.0 > > > {code} > if (!shouldRetry) { > HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new > HashMap<>(); > for (String topic : cluster.topics()) > topicsPartitionInfos.put(topic, > cluster.availablePartitionsForTopic(topic)); > return topicsPartitionInfos; > } > {code} > this leads to inconsistent behavior upstream, for example in > KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions > would be returned, whereas if MD doesnt exist (or has expired) a subset of > partitions (only the healthy ones) would be returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)