dajac commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664469428



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, 
PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state.
+                    TransactionDescription description = 
descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : 
openTransactions) {
+                            // The `DescribeTransactions` API returns all 
partitions being
+                            // written to in an ongoing transaction and any 
partition which
+                            // does not yet have markers written when in the 
`PendingAbort` or
+                            // `PendingCommit` states. If the topic partition 
that we found is
+                            // among these, then we can still expect the 
coordinator to write
+                            // the marker. Otherwise, it is a hanging 
transaction.
+                            if 
(!description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = 
TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    
String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                DescribeTransactionsResult result = 
admin.describeTransactions(new HashSet<>(transactionalIds));
+                Map<String, TransactionDescription> descriptions = new 
HashMap<>();
+
+                for (String transactionalId : transactionalIds) {
+                    try {
+                        TransactionDescription description = 
result.description(transactionalId).get();
+                        descriptions.put(transactionalId, description);
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof 
TransactionalIdNotFoundException) {
+                            descriptions.put(transactionalId, null);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                return descriptions;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + 
transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || 
hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, 
partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " 
topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> 
node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }

Review comment:
       nit: I would move this one next to `consumeInBatches` as they are used 
together.

##########
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +444,532 @@ public void 
testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(new ListTransactionsOptions(), 
listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = 
Mockito.mock(ListTransactionsResult.class);
+        Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new 
PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, 
partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = 
Mockito.mock(DescribeTransactionsResult.class);
+        descriptions.forEach((transactionalId, description) -> {
+            Mockito.when(result.description(transactionalId))
+                .thenReturn(completedFuture(description));
+        });
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new 
ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws 
Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws 
Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds() - 
TimeUnit.MINUTES.toMillis(60);
+        int coordinatorEpoch = 19;
+        long txnStartOffset = 29384L;
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            OptionalInt.of(coordinatorEpoch),
+            OptionalLong.of(txnStartOffset)
+        );
+
+        expectListTransactions(
+            new 
ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+            singletonMap(1, Collections.emptyList())
+        );
+
+        expectDescribeTransactions(Collections.emptyMap());
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(2, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+
+        List<String> expectedRow = asList(
+            topicPartition.topic(),
+            String.valueOf(topicPartition.partition()),
+            String.valueOf(producerId),
+            String.valueOf(producerEpoch),
+            String.valueOf(coordinatorEpoch),
+            String.valueOf(txnStartOffset),
+            String.valueOf(lastTimestamp),
+            "60"
+        );
+        assertEquals(expectedRow, table.get(1));

Review comment:
       nit: We could also define a helper method for this one to avoid the code 
repetition.

##########
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +444,532 @@ public void 
testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws 
Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(new ListTransactionsOptions(), 
listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = 
Mockito.mock(ListTransactionsResult.class);
+        Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        
Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        
Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new 
PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, 
partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = 
Mockito.mock(DescribeTransactionsResult.class);
+        descriptions.forEach((transactionalId, description) -> {
+            Mockito.when(result.description(transactionalId))
+                .thenReturn(completedFuture(description));
+        });
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        
Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new 
ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws 
Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = 
Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = 
asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));

Review comment:
       nit: This block is repeated in many tests. I wonder if we could define 
an helper for it.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, 
PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the 
scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+            Optional<Integer> brokerId = 
Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = 
Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either 
--topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = 
Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic 
to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = 
collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = 
collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = 
describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = 
filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, 
openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding 
to the
+                    // producerId of an open transaction, then the transaction 
is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction 
state.
+                    TransactionDescription description = 
descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : 
openTransactions) {
+                            // The `DescribeTransactions` API returns all 
partitions being
+                            // written to in an ongoing transaction and any 
partition which
+                            // does not yet have markers written when in the 
`PendingAbort` or
+                            // `PendingCommit` states. If the topic partition 
that we found is
+                            // among these, then we can still expect the 
coordinator to write
+                            // the marker. Otherwise, it is a hanging 
transaction.
+                            if 
(!description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = 
TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    
String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    
String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                DescribeTransactionsResult result = 
admin.describeTransactions(new HashSet<>(transactionalIds));
+                Map<String, TransactionDescription> descriptions = new 
HashMap<>();
+
+                for (String transactionalId : transactionalIds) {
+                    try {
+                        TransactionDescription description = 
result.description(transactionalId).get();
+                        descriptions.put(transactionalId, description);
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof 
TransactionalIdNotFoundException) {
+                            descriptions.put(transactionalId, null);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                return descriptions;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + 
transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || 
hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, 
partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " 
topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> 
node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }
+
+        private static class OpenTransaction {
+            private final TopicPartition topicPartition;
+            private final ProducerState producerState;
+
+            private OpenTransaction(
+                TopicPartition topicPartition,
+                ProducerState producerState
+            ) {
+                this.topicPartition = topicPartition;
+                this.producerState = producerState;
+            }
+        }
+
+        private void collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions,
+            List<OpenTransaction> candidateTransactions
+        ) throws Exception {
+            try {
+                DescribeProducersOptions describeOptions = new 
DescribeProducersOptions();
+                brokerId.ifPresent(describeOptions::brokerId);
+
+                Map<TopicPartition, 
DescribeProducersResult.PartitionProducerState> producersByPartition =
+                    admin.describeProducers(topicPartitions, 
describeOptions).all().get();
+
+                long currentTimeMs = time.milliseconds();
+
+                producersByPartition.forEach((topicPartition, producersStates) 
-> {
+                    producersStates.activeProducers().forEach(activeProducer 
-> {
+                        if 
(activeProducer.currentTransactionStartOffset().isPresent()) {
+                            long transactionDurationMs = currentTimeMs - 
activeProducer.lastTimestamp();
+                            if (transactionDurationMs > 
maxTransactionTimeoutMs) {
+                                candidateTransactions.add(new OpenTransaction(
+                                    topicPartition,
+                                    activeProducer
+                                ));
+                            }
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe producers for " + 
topicPartitions.size() +
+                    " partitions on broker " + brokerId, e.getCause());
+            }
+        }
+
+        private Map<Long, String> lookupTransactionalIds(
+            Admin admin,
+            Set<Long> producerIds
+        ) throws Exception {
+            try {
+                ListTransactionsOptions listTransactionsOptions = new 
ListTransactionsOptions()
+                    .filterProducerIds(producerIds);
+
+                Collection<TransactionListing> transactionListings =
+                    
admin.listTransactions(listTransactionsOptions).all().get();
+
+                Map<Long, String> transactionalIdMap = new HashMap<>();
+
+                transactionListings.forEach(listing -> {
+                    if (!producerIds.contains(listing.producerId())) {
+                        log.debug("Received transaction listing {} which has a 
producerId " +
+                            "which was not requested", listing);
+                    } else {
+                        transactionalIdMap.put(
+                            listing.producerId(),
+                            listing.transactionalId()
+                        );
+                    }
+                });
+
+                return transactionalIdMap;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions for " + 
producerIds.size() +
+                    " producers", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+

Review comment:
       nit: This empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to