hachikuji commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664154672



##########
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void 
testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);

Review comment:
       Yeah, I was trying to deal with the fact that `TransactionsCommand` 
invokes both variants of `listTransactions`. I dealt with it by modifying the 
no-arg call to provide the options explicitly.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, 
PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state
+                    TransactionDescription description = 
descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : 
openTransactions) {
+                            if (description.producerEpoch() > 
openTransaction.producerState.producerEpoch()
+                                || 
!description.topicPartitions().contains(openTransaction.topicPartition)) {

Review comment:
       Thanks, added a comment. I decided to remove the epoch check. The 
producer epoch could be bumped as a result of a transaction timeout, so the 
check did not seem quite right.

##########
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void 
testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = 
Mockito.mock(ListTransactionsResult.class);
+
+        if (options == null) {
+            Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        } else {
+            
Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+        }
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new 
PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, 
partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = 
Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new 
ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws 
Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws 
Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {

Review comment:
       Yeah. In fact, the `desribeTransactions` API would return 
`TRANSACTIONAL_ID_NOT_FOUND`, so I needed to do a little refactoring to handle 
this properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to