msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483161801


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1564,97 +1597,18 @@ public void 
testDrainWithANodeThatDoesntHostAnyPartitions() {
             CompressionType.NONE, lingerMs);
 
         // Create cluster metadata, node2 doesn't host any partitions.
-        part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
-        cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+        PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp1, Optional.of(node1.id()), Optional.empty(), null, null, null);
+        PartitionInfo part1 = MetadataResponse.toPartitionInfo(part1Metadata, 
nodesById);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
             Collections.emptySet(), Collections.emptySet());
-        metadataMock = Mockito.mock(Metadata.class);
-        Mockito.when(metadataMock.fetch()).thenReturn(cluster);
-        Mockito.when(metadataMock.currentLeader(tp1)).thenReturn(
-            new Metadata.LeaderAndEpoch(Optional.of(node1),
-                Optional.of(999 /* dummy value */)));
+        InternalCluster internalCluster = new InternalCluster(cluster, 
Collections.singletonMap(tp1, part1Metadata));
 
         // Drain for node2, it should return 0 batches,
-        Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock,
+        Map<Integer, List<ProducerBatch>> batches = 
accum.drain(internalCluster,
             new HashSet<>(Arrays.asList(node2)), 999999 /* maxSize */, 
time.milliseconds());
         assertTrue(batches.get(node2.id()).isEmpty());
     }
 
-    @Test
-    public void testDrainOnANodeWhenItCeasesToBeALeader() throws 
InterruptedException {

Review Comment:
   This is no longer needed as `drainBatchesForOneNode` uses `InternalCluster` 
now Vs `Metadata` earlier. With `Metadata` is mutable, it can happen a node is 
a partition leader but then leadership moves another node. This is not possible 
as `InternalCluster` is immutable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to