hachikuji commented on a change in pull request #10974: URL: https://github.com/apache/kafka/pull/10974#discussion_r664159092
########## File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java ########## @@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin assertNormalExit(); } + @Test + public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "find-hanging" + }); + } + + @Test + public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception { + assertCommandFailure(new String[]{ + "--bootstrap-server", + "localhost:9092", + "find-hanging", + "--broker-id", + "0", + "--partition", + "5" + }); + } + + private void expectListTransactions( + Map<Integer, Collection<TransactionListing>> listingsByBroker + ) { + expectListTransactions(null, listingsByBroker); + } + + private void expectListTransactions( + ListTransactionsOptions options, + Map<Integer, Collection<TransactionListing>> listingsByBroker + ) { + ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class); + + if (options == null) { + Mockito.when(admin.listTransactions()).thenReturn(listResult); + } else { + Mockito.when(admin.listTransactions(options)).thenReturn(listResult); + } + + List<TransactionListing> allListings = new ArrayList<>(); + listingsByBroker.values().forEach(allListings::addAll); + + Mockito.when(listResult.all()).thenReturn(completedFuture(allListings)); + Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker)); + } + + private void expectDescribeProducers( + TopicPartition topicPartition, + long producerId, + short producerEpoch, + long lastTimestamp, + OptionalInt coordinatorEpoch, + OptionalLong txnStartOffset + ) { + PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList( + new ProducerState( + producerId, + producerEpoch, + 500, + lastTimestamp, + coordinatorEpoch, + txnStartOffset + ) + )); + + DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class); + Mockito.when(result.all()).thenReturn( + completedFuture(singletonMap(topicPartition, partitionProducerState)) + ); + + Mockito.when(admin.describeProducers( + Collections.singletonList(topicPartition), + new DescribeProducersOptions() + )).thenReturn(result); + } + + private void expectDescribeTransactions( + Map<String, TransactionDescription> descriptions + ) { + DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class); + Mockito.when(result.all()).thenReturn(completedFuture(descriptions)); + Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result); + } + + private void expectListTopics( + Set<String> topics + ) { + ListTopicsResult result = Mockito.mock(ListTopicsResult.class); + Mockito.when(result.names()).thenReturn(completedFuture(topics)); + Mockito.when(admin.listTopics()).thenReturn(result); + } + + private void expectDescribeTopics( + Map<String, TopicDescription> descriptions + ) { + DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class); + Mockito.when(result.all()).thenReturn(completedFuture(descriptions)); + Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result); + } + + @Test + public void testFindHangingLookupTopicPartitionsForBroker() throws Exception { + int brokerId = 5; + + String[] args = new String[]{ + "--bootstrap-server", + "localhost:9092", + "find-hanging", + "--broker-id", + String.valueOf(brokerId) + }; + + String topic = "foo"; + expectListTopics(singleton(topic)); + + Node node0 = new Node(0, "localhost", 9092); + Node node1 = new Node(1, "localhost", 9093); + Node node5 = new Node(5, "localhost", 9097); + + TopicPartitionInfo partition0 = new TopicPartitionInfo( + 0, + node0, + Arrays.asList(node0, node1), + Arrays.asList(node0, node1) + ); + TopicPartitionInfo partition1 = new TopicPartitionInfo( + 1, + node1, + Arrays.asList(node1, node5), + Arrays.asList(node1, node5) + ); + + TopicDescription description = new TopicDescription( + topic, + false, + Arrays.asList(partition0, partition1) + ); + expectDescribeTopics(singletonMap(topic, description)); + + DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class); + Mockito.when(result.all()).thenReturn(completedFuture(emptyMap())); + + Mockito.when(admin.describeProducers( + Collections.singletonList(new TopicPartition(topic, 1)), + new DescribeProducersOptions().brokerId(brokerId) + )).thenReturn(result); + + execute(args); + assertNormalExit(); + + List<List<String>> table = readOutputAsTable(); + assertEquals(1, table.size()); + + List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); + assertEquals(expectedHeaders, table.get(0)); + } + + @Test + public void testFindHangingLookupTopicPartitionsForTopic() throws Exception { + String topic = "foo"; + + String[] args = new String[]{ + "--bootstrap-server", + "localhost:9092", + "find-hanging", + "--topic", + topic + }; + + Node node0 = new Node(0, "localhost", 9092); + Node node1 = new Node(1, "localhost", 9093); + Node node5 = new Node(5, "localhost", 9097); + + TopicPartitionInfo partition0 = new TopicPartitionInfo( + 0, + node0, + Arrays.asList(node0, node1), + Arrays.asList(node0, node1) + ); + TopicPartitionInfo partition1 = new TopicPartitionInfo( + 1, + node1, + Arrays.asList(node1, node5), + Arrays.asList(node1, node5) + ); + + TopicDescription description = new TopicDescription( + topic, + false, + Arrays.asList(partition0, partition1) + ); + expectDescribeTopics(singletonMap(topic, description)); + + DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class); + Mockito.when(result.all()).thenReturn(completedFuture(emptyMap())); + + Mockito.when(admin.describeProducers( + Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)), + new DescribeProducersOptions() + )).thenReturn(result); + + execute(args); + assertNormalExit(); + + List<List<String>> table = readOutputAsTable(); + assertEquals(1, table.size()); + + List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); + assertEquals(expectedHeaders, table.get(0)); + } + + @Test + public void testFindHangingSpecifiedTopicPartition() throws Exception { + TopicPartition topicPartition = new TopicPartition("foo", 5); + + String[] args = new String[]{ + "--bootstrap-server", + "localhost:9092", + "find-hanging", + "--topic", + topicPartition.topic(), + "--partition", + String.valueOf(topicPartition.partition()) + }; + + long producerId = 132L; + short producerEpoch = 5; + long lastTimestamp = time.milliseconds(); + OptionalInt coordinatorEpoch = OptionalInt.of(19); + OptionalLong txnStartOffset = OptionalLong.of(29384L); + + expectDescribeProducers( + topicPartition, + producerId, + producerEpoch, + lastTimestamp, + coordinatorEpoch, + txnStartOffset + ); + + execute(args); + assertNormalExit(); + + List<List<String>> table = readOutputAsTable(); + assertEquals(1, table.size()); + + List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS); + assertEquals(expectedHeaders, table.get(0)); + } + + @Test + public void testFindHangingNoMappedTransactionalId() throws Exception { Review comment: Yeah. In fact, the `desribeTransactions` API would return `TRANSACTIONAL_ID_NOT_FOUND`, so I needed to do a little refactoring to handle this properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org