mimaison commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1515846272


##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java:
##########
@@ -199,6 +210,54 @@ public void testHandleResponseSanityCheck() {
         assertTrue(result.unmappedKeys.isEmpty());
     }
 
+    // This is a more complicated test which ensures that DeleteRecords 
requests for multiple
+    // leader nodes are correctly divided up among the nodes based on 
leadership.
+    // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3.
+    @Test
+    public void testBuildRequestMultipleLeaders() {
+        MetadataResponseData metadataResponseData = new MetadataResponseData();
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic();
+        topicMetadata.setName("t0").setErrorCode(Errors.NONE.code());
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        metadataResponseData.topics().add(topicMetadata);
+        MetadataResponse metadataResponse = new 
MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion());
+
+        DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+        AdminApiLookupStrategy<TopicPartition> strategy = 
handler.lookupStrategy();
+        assertInstanceOf(PartitionLeaderStrategy.class, strategy);
+        PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) 
strategy;
+        MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, 
t0p1, t0p2, t0p3)).build();
+        assertEquals(mkSet("t0"), new HashSet<>(request.topics()));
+
+        Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3);
+        LookupResult<TopicPartition> lookupResult = 
strategy.handleResponse(tpSet, metadataResponse);
+        assertEquals(emptyMap(), lookupResult.failedKeys);
+        assertEquals(tpSet, lookupResult.mappedKeys.keySet());
+
+        Map<Integer, Set<TopicPartition>> flippedMapping = new HashMap<>();

Review Comment:
   What about naming this variable something like `partitionsPerBroker`?



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java:
##########
@@ -199,6 +210,54 @@ public void testHandleResponseSanityCheck() {
         assertTrue(result.unmappedKeys.isEmpty());
     }
 
+    // This is a more complicated test which ensures that DeleteRecords 
requests for multiple
+    // leader nodes are correctly divided up among the nodes based on 
leadership.
+    // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3.
+    @Test
+    public void testBuildRequestMultipleLeaders() {
+        MetadataResponseData metadataResponseData = new MetadataResponseData();
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic();
+        topicMetadata.setName("t0").setErrorCode(Errors.NONE.code());
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        metadataResponseData.topics().add(topicMetadata);
+        MetadataResponse metadataResponse = new 
MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion());
+
+        DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+        AdminApiLookupStrategy<TopicPartition> strategy = 
handler.lookupStrategy();
+        assertInstanceOf(PartitionLeaderStrategy.class, strategy);
+        PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) 
strategy;
+        MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, 
t0p1, t0p2, t0p3)).build();
+        assertEquals(mkSet("t0"), new HashSet<>(request.topics()));
+
+        Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3);
+        LookupResult<TopicPartition> lookupResult = 
strategy.handleResponse(tpSet, metadataResponse);
+        assertEquals(emptyMap(), lookupResult.failedKeys);
+        assertEquals(tpSet, lookupResult.mappedKeys.keySet());
+
+        Map<Integer, Set<TopicPartition>> flippedMapping = new HashMap<>();
+        lookupResult.mappedKeys.forEach((tp, node) -> 
flippedMapping.computeIfAbsent(node, key -> new HashSet<>()).add(tp));
+
+        DeleteRecordsRequest deleteRequest = 
handler.buildBatchedRequest(node1.id(), flippedMapping.get(1)).build();
+        assertEquals(2, 
deleteRequest.data().topics().get(0).partitions().size());
+        assertEquals(mkSet(t0p0, t0p2),
+                new 
HashSet<>(deleteRequest.data().topics().get(0).partitions().stream()
+                        .map(drp -> new TopicPartition("t0", 
drp.partitionIndex()))
+                        .collect(Collectors.toList())));

Review Comment:
   This can be simplified into:
   ```
   assertEquals(mkSet(t0p0, t0p2),
                   deleteRequest.data().topics().get(0).partitions().stream()
                           .map(drp -> new TopicPartition("t0", 
drp.partitionIndex()))
                           .collect(Collectors.toSet()));
   ```
   
   Same below



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java:
##########
@@ -199,6 +210,54 @@ public void testHandleResponseSanityCheck() {
         assertTrue(result.unmappedKeys.isEmpty());
     }
 
+    // This is a more complicated test which ensures that DeleteRecords 
requests for multiple
+    // leader nodes are correctly divided up among the nodes based on 
leadership.
+    // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3.
+    @Test
+    public void testBuildRequestMultipleLeaders() {
+        MetadataResponseData metadataResponseData = new MetadataResponseData();
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic();
+        topicMetadata.setName("t0").setErrorCode(Errors.NONE.code());
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code()));
+        topicMetadata.partitions().add(new MetadataResponsePartition()
+                
.setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code()));
+        metadataResponseData.topics().add(topicMetadata);
+        MetadataResponse metadataResponse = new 
MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion());
+
+        DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
+        AdminApiLookupStrategy<TopicPartition> strategy = 
handler.lookupStrategy();
+        assertInstanceOf(PartitionLeaderStrategy.class, strategy);
+        PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) 
strategy;
+        MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, 
t0p1, t0p2, t0p3)).build();
+        assertEquals(mkSet("t0"), new HashSet<>(request.topics()));
+
+        Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3);
+        LookupResult<TopicPartition> lookupResult = 
strategy.handleResponse(tpSet, metadataResponse);
+        assertEquals(emptyMap(), lookupResult.failedKeys);
+        assertEquals(tpSet, lookupResult.mappedKeys.keySet());
+
+        Map<Integer, Set<TopicPartition>> flippedMapping = new HashMap<>();
+        lookupResult.mappedKeys.forEach((tp, node) -> 
flippedMapping.computeIfAbsent(node, key -> new HashSet<>()).add(tp));
+
+        DeleteRecordsRequest deleteRequest = 
handler.buildBatchedRequest(node1.id(), flippedMapping.get(1)).build();

Review Comment:
   Can we do `flippedMapping.get(node1.id())`?
   Same below



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java:
##########
@@ -63,11 +74,11 @@ public class DeleteRecordsHandlerTest {
     @Test
     public void testBuildRequestSimple() {
         DeleteRecordsHandler handler = new 
DeleteRecordsHandler(recordsToDelete, logContext, timeout);
-        DeleteRecordsRequest request = handler.buildBatchedRequest(node.id(), 
mkSet(t0p0, t0p1)).build();
+        DeleteRecordsRequest request = handler.buildBatchedRequest(node1.id(), 
mkSet(t0p0, t0p1)).build();
         List<DeleteRecordsRequestData.DeleteRecordsTopic> topicPartitions = 
request.data().topics();

Review Comment:
   While we're fixing this test, can we also rename this variable. Something 
like `topics` would fit better I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to