[ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568531#comment-16568531
 ] 

ASF GitHub Bot commented on KAFKA-6648:
---------------------------------------

lindong28 closed pull request #4679: KAFKA-6648 - Fetcher.getTopicMetadata() 
should return all partitions for each requested topic
URL: https://github.com/apache/kafka/pull/4679
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ca8e0d26c81..89ea95dab4e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -313,7 +313,7 @@ else if (error.exception() instanceof RetriableException)
                 if (!shouldRetry) {
                     HashMap<String, List<PartitionInfo>> topicsPartitionInfos 
= new HashMap<>();
                     for (String topic : cluster.topics())
-                        topicsPartitionInfos.put(topic, 
cluster.availablePartitionsForTopic(topic));
+                        topicsPartitionInfos.put(topic, 
cluster.partitionsForTopic(topic));
                     return topicsPartitionInfos;
                 }
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 9164daa426c..af50b5272de 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -84,6 +84,7 @@
 import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -1388,6 +1389,51 @@ public void testGetTopicMetadataLeaderNotAvailable() {
         assertTrue(topicMetadata.containsKey(topicName));
     }
 
+    @Test
+    public void testGetTopicMetadataOfflinePartitions() {
+        MetadataResponse originalResponse = newMetadataResponse(topicName, 
Errors.NONE); //baseline ok response
+
+        //create a response based on the above one with all partitions being 
leaderless
+        List<MetadataResponse.TopicMetadata> altTopics = new ArrayList<>();
+        for (MetadataResponse.TopicMetadata item : 
originalResponse.topicMetadata()) {
+            List<MetadataResponse.PartitionMetadata> partitions = 
item.partitionMetadata();
+            List<MetadataResponse.PartitionMetadata> altPartitions = new 
ArrayList<>();
+            for (MetadataResponse.PartitionMetadata p : partitions) {
+                altPartitions.add(new MetadataResponse.PartitionMetadata(
+                    p.error(),
+                    p.partition(),
+                    null, //no leader
+                    p.replicas(),
+                    p.isr(),
+                    p.offlineReplicas())
+                );
+            }
+            MetadataResponse.TopicMetadata alteredTopic = new 
MetadataResponse.TopicMetadata(
+                item.error(),
+                item.topic(),
+                item.isInternal(),
+                altPartitions
+            );
+            altTopics.add(alteredTopic);
+        }
+        Node controller = originalResponse.controller();
+        MetadataResponse altered = new MetadataResponse(
+            (List<Node>) originalResponse.brokers(),
+            originalResponse.clusterId(),
+            controller != null ? controller.id() : 
MetadataResponse.NO_CONTROLLER_ID,
+            altTopics);
+
+        client.prepareResponse(altered);
+
+        Map<String, List<PartitionInfo>> topicMetadata =
+            fetcher.getTopicMetadata(new 
MetadataRequest.Builder(Collections.singletonList(topicName), false), 5000L);
+
+        Assert.assertNotNull(topicMetadata);
+        Assert.assertNotNull(topicMetadata.get(topicName));
+        //noinspection ConstantConditions
+        Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), 
topicMetadata.get(topicName).size());
+    }
+
     /*
      * Send multiple requests. Verify that the client side quota metrics have 
the right values
      */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fetcher.getTopicMetadata() should return all partitions for each requested 
> topic
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-6648
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6648
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.11.0.2, 1.0.0
>            Reporter: radai rosenblatt
>            Assignee: radai rosenblatt
>            Priority: Major
>             Fix For: 2.1.0
>
>
> {code}
> if (!shouldRetry) {
>    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new 
> HashMap<>();
>    for (String topic : cluster.topics())
>       topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>    return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to