mimaison commented on code in PR #15479: URL: https://github.com/apache/kafka/pull/15479#discussion_r1515846272
########## clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java: ########## @@ -199,6 +210,54 @@ public void testHandleResponseSanityCheck() { assertTrue(result.unmappedKeys.isEmpty()); } + // This is a more complicated test which ensures that DeleteRecords requests for multiple + // leader nodes are correctly divided up among the nodes based on leadership. + // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3. + @Test + public void testBuildRequestMultipleLeaders() { + MetadataResponseData metadataResponseData = new MetadataResponseData(); + MetadataResponseTopic topicMetadata = new MetadataResponseTopic(); + topicMetadata.setName("t0").setErrorCode(Errors.NONE.code()); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + metadataResponseData.topics().add(topicMetadata); + MetadataResponse metadataResponse = new MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion()); + + DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); + AdminApiLookupStrategy<TopicPartition> strategy = handler.lookupStrategy(); + assertInstanceOf(PartitionLeaderStrategy.class, strategy); + PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) strategy; + MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, t0p1, t0p2, t0p3)).build(); + assertEquals(mkSet("t0"), new HashSet<>(request.topics())); + + Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3); + LookupResult<TopicPartition> lookupResult = strategy.handleResponse(tpSet, metadataResponse); + assertEquals(emptyMap(), lookupResult.failedKeys); + assertEquals(tpSet, lookupResult.mappedKeys.keySet()); + + Map<Integer, Set<TopicPartition>> flippedMapping = new HashMap<>(); Review Comment: What about naming this variable something like `partitionsPerBroker`? ########## clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java: ########## @@ -199,6 +210,54 @@ public void testHandleResponseSanityCheck() { assertTrue(result.unmappedKeys.isEmpty()); } + // This is a more complicated test which ensures that DeleteRecords requests for multiple + // leader nodes are correctly divided up among the nodes based on leadership. + // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3. + @Test + public void testBuildRequestMultipleLeaders() { + MetadataResponseData metadataResponseData = new MetadataResponseData(); + MetadataResponseTopic topicMetadata = new MetadataResponseTopic(); + topicMetadata.setName("t0").setErrorCode(Errors.NONE.code()); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + metadataResponseData.topics().add(topicMetadata); + MetadataResponse metadataResponse = new MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion()); + + DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); + AdminApiLookupStrategy<TopicPartition> strategy = handler.lookupStrategy(); + assertInstanceOf(PartitionLeaderStrategy.class, strategy); + PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) strategy; + MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, t0p1, t0p2, t0p3)).build(); + assertEquals(mkSet("t0"), new HashSet<>(request.topics())); + + Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3); + LookupResult<TopicPartition> lookupResult = strategy.handleResponse(tpSet, metadataResponse); + assertEquals(emptyMap(), lookupResult.failedKeys); + assertEquals(tpSet, lookupResult.mappedKeys.keySet()); + + Map<Integer, Set<TopicPartition>> flippedMapping = new HashMap<>(); + lookupResult.mappedKeys.forEach((tp, node) -> flippedMapping.computeIfAbsent(node, key -> new HashSet<>()).add(tp)); + + DeleteRecordsRequest deleteRequest = handler.buildBatchedRequest(node1.id(), flippedMapping.get(1)).build(); + assertEquals(2, deleteRequest.data().topics().get(0).partitions().size()); + assertEquals(mkSet(t0p0, t0p2), + new HashSet<>(deleteRequest.data().topics().get(0).partitions().stream() + .map(drp -> new TopicPartition("t0", drp.partitionIndex())) + .collect(Collectors.toList()))); Review Comment: This can be simplified into: ``` assertEquals(mkSet(t0p0, t0p2), deleteRequest.data().topics().get(0).partitions().stream() .map(drp -> new TopicPartition("t0", drp.partitionIndex())) .collect(Collectors.toSet())); ``` Same below ########## clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java: ########## @@ -199,6 +210,54 @@ public void testHandleResponseSanityCheck() { assertTrue(result.unmappedKeys.isEmpty()); } + // This is a more complicated test which ensures that DeleteRecords requests for multiple + // leader nodes are correctly divided up among the nodes based on leadership. + // node1 leads t0p0 and t0p2, while node2 leads t0p1 and t0p3. + @Test + public void testBuildRequestMultipleLeaders() { + MetadataResponseData metadataResponseData = new MetadataResponseData(); + MetadataResponseTopic topicMetadata = new MetadataResponseTopic(); + topicMetadata.setName("t0").setErrorCode(Errors.NONE.code()); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(0).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(1).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(2).setLeaderId(node1.id()).setErrorCode(Errors.NONE.code())); + topicMetadata.partitions().add(new MetadataResponsePartition() + .setPartitionIndex(3).setLeaderId(node2.id()).setErrorCode(Errors.NONE.code())); + metadataResponseData.topics().add(topicMetadata); + MetadataResponse metadataResponse = new MetadataResponse(metadataResponseData, ApiKeys.METADATA.latestVersion()); + + DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); + AdminApiLookupStrategy<TopicPartition> strategy = handler.lookupStrategy(); + assertInstanceOf(PartitionLeaderStrategy.class, strategy); + PartitionLeaderStrategy specificStrategy = (PartitionLeaderStrategy) strategy; + MetadataRequest request = specificStrategy.buildRequest(mkSet(t0p0, t0p1, t0p2, t0p3)).build(); + assertEquals(mkSet("t0"), new HashSet<>(request.topics())); + + Set<TopicPartition> tpSet = mkSet(t0p0, t0p1, t0p2, t0p3); + LookupResult<TopicPartition> lookupResult = strategy.handleResponse(tpSet, metadataResponse); + assertEquals(emptyMap(), lookupResult.failedKeys); + assertEquals(tpSet, lookupResult.mappedKeys.keySet()); + + Map<Integer, Set<TopicPartition>> flippedMapping = new HashMap<>(); + lookupResult.mappedKeys.forEach((tp, node) -> flippedMapping.computeIfAbsent(node, key -> new HashSet<>()).add(tp)); + + DeleteRecordsRequest deleteRequest = handler.buildBatchedRequest(node1.id(), flippedMapping.get(1)).build(); Review Comment: Can we do `flippedMapping.get(node1.id())`? Same below ########## clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandlerTest.java: ########## @@ -63,11 +74,11 @@ public class DeleteRecordsHandlerTest { @Test public void testBuildRequestSimple() { DeleteRecordsHandler handler = new DeleteRecordsHandler(recordsToDelete, logContext, timeout); - DeleteRecordsRequest request = handler.buildBatchedRequest(node.id(), mkSet(t0p0, t0p1)).build(); + DeleteRecordsRequest request = handler.buildBatchedRequest(node1.id(), mkSet(t0p0, t0p1)).build(); List<DeleteRecordsRequestData.DeleteRecordsTopic> topicPartitions = request.data().topics(); Review Comment: While we're fixing this test, can we also rename this variable. Something like `topics` would fit better I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org