msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483161801
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -1564,97 +1597,18 @@ public void testDrainWithANodeThatDoesntHostAnyPartitions() { CompressionType.NONE, lingerMs); // Create cluster metadata, node2 doesn't host any partitions. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.empty(), null, null, null); + PartitionInfo part1 = MetadataResponse.toPartitionInfo(part1Metadata, nodesById); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), Collections.emptySet(), Collections.emptySet()); - metadataMock = Mockito.mock(Metadata.class); - Mockito.when(metadataMock.fetch()).thenReturn(cluster); - Mockito.when(metadataMock.currentLeader(tp1)).thenReturn( - new Metadata.LeaderAndEpoch(Optional.of(node1), - Optional.of(999 /* dummy value */))); + InternalCluster internalCluster = new InternalCluster(cluster, Collections.singletonMap(tp1, part1Metadata)); // Drain for node2, it should return 0 batches, - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(internalCluster, new HashSet<>(Arrays.asList(node2)), 999999 /* maxSize */, time.milliseconds()); assertTrue(batches.get(node2.id()).isEmpty()); } - @Test - public void testDrainOnANodeWhenItCeasesToBeALeader() throws InterruptedException { Review Comment: This is no longer needed as `drainBatchesForOneNode` uses `InternalCluster` now Vs `Metadata` earlier. With `Metadata` is mutable, it can happen a node is a partition leader but then leadership moves another node. This is not possible as `InternalCluster` is immutable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org