showuon commented on a change in pull request #10974: URL: https://github.com/apache/kafka/pull/10974#discussion_r664276058
########## File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java ########## @@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception } } + static class FindHangingTransactionsCommand extends TransactionsCommand { + private static final int MAX_BATCH_SIZE = 500; + + static final String[] HEADERS = new String[] { + "Topic", + "Partition", + "ProducerId", + "ProducerEpoch", + "CoordinatorEpoch", + "StartOffset", + "LastTimestamp", + "Duration(min)" + }; + + FindHangingTransactionsCommand(Time time) { + super(time); + } + + @Override + String name() { + return "find-hanging"; + } + + @Override + void addSubparser(Subparsers subparsers) { + Subparser subparser = subparsers.addParser(name()) + .help("find hanging transactions"); + + subparser.addArgument("--broker-id") + .help("broker id to search for hanging transactions") + .action(store()) + .type(Integer.class) + .required(false); + + subparser.addArgument("--max-transaction-timeout") + .help("maximum transaction timeout in minutes to limit the scope of the search") + .action(store()) + .type(Integer.class) + .setDefault(15) + .required(false); + + subparser.addArgument("--topic") + .help("topic name to limit search to") Review comment: nit: "topic name to limit search to. REQUIRED if --partition is specified." ########## File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java ########## @@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception } } + static class FindHangingTransactionsCommand extends TransactionsCommand { + private static final int MAX_BATCH_SIZE = 500; + + static final String[] HEADERS = new String[] { + "Topic", + "Partition", + "ProducerId", + "ProducerEpoch", + "CoordinatorEpoch", + "StartOffset", + "LastTimestamp", + "Duration(min)" + }; + + FindHangingTransactionsCommand(Time time) { + super(time); + } + + @Override + String name() { + return "find-hanging"; + } + + @Override + void addSubparser(Subparsers subparsers) { + Subparser subparser = subparsers.addParser(name()) + .help("find hanging transactions"); + + subparser.addArgument("--broker-id") + .help("broker id to search for hanging transactions") + .action(store()) + .type(Integer.class) + .required(false); + + subparser.addArgument("--max-transaction-timeout") + .help("maximum transaction timeout in minutes to limit the scope of the search") Review comment: Could we also mention that default is 15 mins if not set? ########## File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java ########## @@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception } } + static class FindHangingTransactionsCommand extends TransactionsCommand { + private static final int MAX_BATCH_SIZE = 500; + + static final String[] HEADERS = new String[] { + "Topic", + "Partition", + "ProducerId", + "ProducerEpoch", + "CoordinatorEpoch", + "StartOffset", + "LastTimestamp", + "Duration(min)" + }; + + FindHangingTransactionsCommand(Time time) { + super(time); + } + + @Override + String name() { + return "find-hanging"; + } + + @Override + void addSubparser(Subparsers subparsers) { + Subparser subparser = subparsers.addParser(name()) + .help("find hanging transactions"); + + subparser.addArgument("--broker-id") + .help("broker id to search for hanging transactions") + .action(store()) + .type(Integer.class) + .required(false); + + subparser.addArgument("--max-transaction-timeout") + .help("maximum transaction timeout in minutes to limit the scope of the search") + .action(store()) + .type(Integer.class) + .setDefault(15) + .required(false); + + subparser.addArgument("--topic") + .help("topic name to limit search to") + .action(store()) + .type(String.class) + .required(false); + + subparser.addArgument("--partition") + .help("partition number") + .action(store()) + .type(Integer.class) + .required(false); + } + + @Override + void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { + Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id")); + Optional<String> topic = Optional.ofNullable(ns.getString("topic")); + + if (!topic.isPresent() && !brokerId.isPresent()) { + printErrorAndExit("The `find-hanging` command requires either --topic " + + "or --broker-id to limit the scope of the search"); Review comment: Could we handle the case that both `topic` and `brokerId` are provided? Will it turn out that we tried to find the specific topic but not in the brokerId? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org