jolshan commented on code in PR #15023:
URL: https://github.com/apache/kafka/pull/15023#discussion_r1432814990


##########
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java:
##########
@@ -1067,25 +1067,102 @@ protected boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         assertEquals(Uuid.ZERO_UUID, cluster.topicId("validTopic1"));
     }
 
+    @Test
+    public void testTopicMetadataOnUpdatePartitionLeadership() {
+        String topic = "input-topic";
+        Uuid topicId = Uuid.randomUuid();
+
+        Time time = new MockTime();
+
+        metadata = new Metadata(
+            refreshBackoffMs,
+            refreshBackoffMaxMs,
+            metadataExpireMs,
+            new LogContext(),
+            new ClusterResourceListeners());
+        Node node1 = new Node(1, "localhost", 9091);
+        Node node2 = new Node(2, "localhost", 9091);
+
+        TopicPartition tp0 = new TopicPartition(topic, 0);
+        MetadataResponse.PartitionMetadata partition0 = new 
MetadataResponse.PartitionMetadata(
+            Errors.NONE,
+            tp0,
+            Optional.of(1),
+            Optional.of(1),
+            Arrays.asList(1, 2),
+            Arrays.asList(1, 2),
+            Collections.emptyList()
+        );
+        TopicPartition tp1 = new TopicPartition(topic, 1);
+        MetadataResponse.PartitionMetadata partition1 =
+            new MetadataResponse.PartitionMetadata(
+            Errors.NONE,
+            tp1,
+            Optional.of(1),
+            Optional.of(1),
+            Arrays.asList(1, 2),
+            Arrays.asList(1, 2),
+            Collections.emptyList()
+        );
+        MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(
+            Errors.NONE,
+            topic,
+            topicId,
+            false,
+            Arrays.asList(partition0, partition1),
+            MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED
+        );
+
+        // Initialize metadata with two partitions
+        MetadataResponse response = RequestTestUtils.metadataResponse(
+            Arrays.asList(node1, node2),
+            "clusterId",
+            node1.id(),
+            Collections.singletonList(topicMetadata));
+        metadata.updateWithCurrentRequestVersion(
+            response,
+            false,
+            time.milliseconds());
+        assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+        assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+        assertEquals(1, metadata.fetch().partition(tp1).leader().id());
+
+        // "input-topic" partition 1 leader changes from node 1 to node 2
+        metadata.updatePartitionLeadership(
+            Collections.singletonMap(
+                tp1,
+                new Metadata.LeaderIdAndEpoch(
+                    Optional.of(2),
+                    Optional.of(3)
+            )),
+            Arrays.asList(node1)
+        );
+        assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
+        assertEquals(1, metadata.fetch().partition(tp0).leader().id());
+        assertEquals(2, metadata.fetch().partition(tp1).leader().id());
+    }
+
     @Test
     public void testUpdatePartitionLeadership() {
         Time time = new MockTime();
 
-        // Setup metadata with initial set of 2 partitions, 1 each across 
topics, with 5 nodes.
-        // Also setup, 1 invalid topic, 1 unauthorized topic, 1 internal topic.
+        // Initialize metadata
         int numNodes = 5;
         metadata = new Metadata(refreshBackoffMs, refreshBackoffMaxMs, 
metadataExpireMs, new LogContext(), new ClusterResourceListeners());
         ClusterResourceListener mockListener = 
Mockito.mock(ClusterResourceListener.class);
         metadata.addClusterUpdateListener(mockListener);
-
+        // topic1 has 2 partitions: tp11, tp12
+        // topic2 has 1 partition: tp21
         String topic1 = "topic1";
-        TopicPartition partition1 = new TopicPartition(topic1, 0);
-        PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
partition1, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), 
Arrays.asList(1, 2), Arrays.asList(3));
+        TopicPartition tp11 = new TopicPartition(topic1, 0);
+        PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 
2), Arrays.asList(3));
         Uuid topic1Id = Uuid.randomUuid();
+        TopicPartition tp12 = new TopicPartition(topic1, 1);
+        PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, 
tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 
3), Arrays.asList(1));

Review Comment:
   nit: (we can address in a followup refactor) this naming is a bit confusing. 
Maybe all should be part11Metadata, part12Metadata, part21Metadata or even just 
tp11Metadata, tp12Metadata etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to