Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan merged PR #15384: URL: https://github.com/apache/kafka/pull/15384 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on PR #15384: URL: https://github.com/apache/kafka/pull/15384#issuecomment-1960018327 Something wrong with the build so I will restart. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1499709434 ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ## @@ -513,12 +514,16 @@ class TransactionStateManagerTest { putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing) putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing) +// update time to create transactions with various durations +time.sleep(1000) putTransaction(transactionalId = "t2", producerId = 2, state = PrepareCommit) putTransaction(transactionalId = "t3", producerId = 3, state = PrepareAbort) +time.sleep(1000) Review Comment: We should always use mock time :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1499702168 ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ## @@ -527,16 +532,20 @@ class TransactionStateManagerTest { def assertListTransactions( expectedTransactionalIds: Set[String], filterProducerIds: Set[Long] = Set.empty, - filterStates: Set[String] = Set.empty + filterStates: Set[String] = Set.empty, + filterDuration: Long = -1L ): Unit = { - val listResponse = transactionManager.listTransactionStates(filterProducerIds, filterStates) + val listResponse = transactionManager.listTransactionStates(filterProducerIds, filterStates, filterDuration) assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode)) assertEquals(expectedTransactionalIds, listResponse.transactionStates.asScala.map(_.transactionalId).toSet) val expectedUnknownStates = filterStates.filter(state => TransactionState.fromName(state).isEmpty) assertEquals(expectedUnknownStates, listResponse.unknownStateFilters.asScala.toSet) } - assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7")) +assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"), filterDuration = 0L) +assertListTransactions(Set("t0", "t1", "t2", "t3"), filterDuration = 1000L) +assertListTransactions(Set("t0", "t1"), filterDuration = 2000L) Review Comment: Their durations are 2000L so they will be filtered out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1499698463 ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ## @@ -513,12 +514,16 @@ class TransactionStateManagerTest { putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing) putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing) +// update time to create transactions with various durations +time.sleep(1000) putTransaction(transactionalId = "t2", producerId = 2, state = PrepareCommit) putTransaction(transactionalId = "t3", producerId = 3, state = PrepareAbort) +time.sleep(1000) Review Comment: We are using mock time, so this should not slow down the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
RamanVerma commented on PR #15384: URL: https://github.com/apache/kafka/pull/15384#issuecomment-1959796902 I think the unit tests look good. Is it possible to write/edit (in case we have one existing) an integration test that builds a request and validates the response from broker. You will need to feed in some transactions to the broker transactionManager cache as your test data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
RamanVerma commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1499456622 ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ## @@ -527,16 +532,20 @@ class TransactionStateManagerTest { def assertListTransactions( expectedTransactionalIds: Set[String], filterProducerIds: Set[Long] = Set.empty, - filterStates: Set[String] = Set.empty + filterStates: Set[String] = Set.empty, + filterDuration: Long = -1L ): Unit = { - val listResponse = transactionManager.listTransactionStates(filterProducerIds, filterStates) + val listResponse = transactionManager.listTransactionStates(filterProducerIds, filterStates, filterDuration) assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode)) assertEquals(expectedTransactionalIds, listResponse.transactionStates.asScala.map(_.transactionalId).toSet) val expectedUnknownStates = filterStates.filter(state => TransactionState.fromName(state).isEmpty) assertEquals(expectedUnknownStates, listResponse.unknownStateFilters.asScala.toSet) } - assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7")) +assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"), filterDuration = 0L) +assertListTransactions(Set("t0", "t1", "t2", "t3"), filterDuration = 1000L) +assertListTransactions(Set("t0", "t1"), filterDuration = 2000L) Review Comment: curious: why are we not getting t2 and t3 in this test (duration greater than 2000L) ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ## @@ -513,12 +514,16 @@ class TransactionStateManagerTest { putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing) putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing) +// update time to create transactions with various durations +time.sleep(1000) putTransaction(transactionalId = "t2", producerId = 2, state = PrepareCommit) putTransaction(transactionalId = "t3", producerId = 3, state = PrepareAbort) +time.sleep(1000) Review Comment: We should reduce the sleep times to 100s of ms rather than 1000s. Similarly, change lines 545-548. This will speed up the test and still serve our purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496555226 ## tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java: ## @@ -436,16 +436,26 @@ public String name() { @Override public void addSubparser(Subparsers subparsers) { -subparsers.addParser(name()) +Subparser subparser = subparsers.addParser(name()) .help("list transactions"); + +subparser.addArgument("--duration-filter") +.help("Duration (in millis) to filter by: if < 0, all transactions will be returned; " + +"otherwise, only transactions running longer than this duration will be returned") +.action(store()) +.type(Long.class) +.required(false); } @Override public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { +ListTransactionsOptions options = new ListTransactionsOptions(); + Optional.ofNullable(ns.getLong("duration_filter")).ifPresent(options::durationFilter); Review Comment: please update this line as well to reflect the new name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496360067 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { +return durationFilter; +} + @Override public String toString() { return "ListTransactionsOptions(" + "filteredStates=" + filteredStates + ", filteredProducerIds=" + filteredProducerIds + +", durationFilter=" + durationFilter + Review Comment: updated in [8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7) ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { Review Comment: updated in [8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7) ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -61,6 +62,11 @@ public ListTransactionsOptions filterProducerIds(Collection producerIdFilt return this; } +public ListTransactionsOptions durationFilter(long durationMs) { Review Comment: updated in [8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496339739 ## tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java: ## @@ -187,14 +187,25 @@ private void testDescribeProducers( assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size(; } -@Test -public void testListTransactions() throws Exception { +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testListTransactions(boolean hasDurationFilter) throws Exception { String[] args = new String[] { "--bootstrap-server", "localhost:9092", "list" }; +if (hasDurationFilter) { +args = new String[] { +"--bootstrap-server", +"localhost:9092", +"list", +"--duration-filter", +Long.toString(Long.MAX_VALUE) Review Comment: This just tests the cli cmd parsing parameters correctly. It will not call actual list transaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1496132259 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: Im not sure what he is supposed to change 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
RamanVerma commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1494984871 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: @yyu1993 this default value needs to be changed to -1L as well ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -61,6 +62,11 @@ public ListTransactionsOptions filterProducerIds(Collection producerIdFilt return this; } +public ListTransactionsOptions durationFilter(long durationMs) { Review Comment: Please add a comment to this method like we have for the other methods above. Also, we should probably change the method name to filterOnDuration, to match rest of the filter setting methods. ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { Review Comment: Please add a Java doc comment to the method. Also, change the method name to `filteredDuration` ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -81,11 +87,16 @@ public Set filteredProducerIds() { return filteredProducerIds; } +public long getDurationFilter() { +return durationFilter; +} + @Override public String toString() { return "ListTransactionsOptions(" + "filteredStates=" + filteredStates + ", filteredProducerIds=" + filteredProducerIds + +", durationFilter=" + durationFilter + Review Comment: nit: durationFilter -> filteredDuration ## tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java: ## @@ -187,14 +187,25 @@ private void testDescribeProducers( assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size(; } -@Test -public void testListTransactions() throws Exception { +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testListTransactions(boolean hasDurationFilter) throws Exception { String[] args = new String[] { "--bootstrap-server", "localhost:9092", "list" }; +if (hasDurationFilter) { +args = new String[] { +"--bootstrap-server", +"localhost:9092", +"list", +"--duration-filter", +Long.toString(Long.MAX_VALUE) Review Comment: hmm this will not return anything, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493062297 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -61,6 +62,11 @@ public ListTransactionsOptions filterProducerIds(Collection producerIdFilt return this; } +public ListTransactionsOptions durationFilter(Long durationMs) { Review Comment: nit: these can all be primitive longs. (lowercase) There is usage in this file and the Admin file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493050657 ## clients/src/main/resources/common/message/ListTransactionsResponse.json: ## @@ -17,7 +17,8 @@ "apiKey": 66, "type": "response", "name": "ListTransactionsResponse", - "validVersions": "0", + // Version 1 is the same as vesion 0 (KIP-994). Review Comment: Nice catch. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493041438 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() { * coordinators in the cluster and collect the state of all transactions. Users * should typically attempt to reduce the size of the result set using * {@link ListTransactionsOptions#filterProducerIds(Collection)} or - * {@link ListTransactionsOptions#filterStates(Collection)} + * {@link ListTransactionsOptions#filterStates(Collection)} or Review Comment: The ListTransactions api itself is able to filter by producer ids and states (see `handleListTransactions` in TransactionCoordinator). However, the cli tool does not support those filters currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493039266 ## clients/src/main/resources/common/message/ListTransactionsRequest.json: ## @@ -18,14 +18,18 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListTransactionsRequest", - "validVersions": "0", + // version 1: adds DurationFilter to list transactions older than specified duration Review Comment: nit: most specs capitalize V here. ## clients/src/main/resources/common/message/ListTransactionsResponse.json: ## @@ -17,7 +17,8 @@ "apiKey": 66, "type": "response", "name": "ListTransactionsResponse", - "validVersions": "0", + // Version 1 is the same as vesion 0 (KIP-994). Review Comment: nit: version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1493024798 ## clients/src/main/java/org/apache/kafka/clients/admin/Admin.java: ## @@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() { * coordinators in the cluster and collect the state of all transactions. Users * should typically attempt to reduce the size of the result set using * {@link ListTransactionsOptions#filterProducerIds(Collection)} or - * {@link ListTransactionsOptions#filterStates(Collection)} + * {@link ListTransactionsOptions#filterStates(Collection)} or Review Comment: Maybe a silly question, but I noticed this comment mentions filtering by producer ID or state. Do you know where that is done? It looks like durationFilter is the only subparser argument for list in TransactionCommand -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492876179 ## tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java: ## @@ -436,16 +436,25 @@ public String name() { @Override public void addSubparser(Subparsers subparsers) { -subparsers.addParser(name()) +Subparser subparser = subparsers.addParser(name()) .help("list transactions"); + +subparser.addArgument("--duration-filter") +.help("filter duration of transaction in ms, only transactions running longer than this duration will be returned") Review Comment: Updated the help message in [81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492875733 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: Updated default value to -1L in other places for consistency in [81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492874583 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -35,6 +35,7 @@ public class ListTransactionsOptions extends AbstractOptions filteredStates = Collections.emptySet(); private Set filteredProducerIds = Collections.emptySet(); +private Long durationFilter = 0L; Review Comment: Updated default value to -1L in [81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492849366 ## tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java: ## @@ -436,16 +436,25 @@ public String name() { @Override public void addSubparser(Subparsers subparsers) { -subparsers.addParser(name()) +Subparser subparser = subparsers.addParser(name()) .help("list transactions"); + +subparser.addArgument("--duration-filter") +.help("filter duration of transaction in ms, only transactions running longer than this duration will be returned") Review Comment: we should mention the default/value that gives all running transactions. Providing 0 and getting a ton of transactions might be confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492847691 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, def handleListTransactions( filteredProducerIds: Set[Long], -filteredStates: Set[String] +filteredStates: Set[String], +durationFilter: Long = -1 Review Comment: nit: this is -1 -- is that intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on code in PR #15384: URL: https://github.com/apache/kafka/pull/15384#discussion_r1492846127 ## clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java: ## @@ -35,6 +35,7 @@ public class ListTransactionsOptions extends AbstractOptions filteredStates = Collections.emptySet(); private Set filteredProducerIds = Collections.emptySet(); +private Long durationFilter = 0L; Review Comment: nit: any reason we chose default to be zero and not -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
yyu1993 opened a new pull request, #15384: URL: https://github.com/apache/kafka/pull/15384 Introduces a new filter in ListTransactionsRequest API. This enables caller to filter on transactions that have been running for longer than a certain duration of time. This PR includes the following changes: 1. bumps version for ListTransactionsRequest API to 1. Set the durationFilter to 0 when communicating with an older broker that does not support version 1. 2. bumps version for ListTransactionsResponse to 1 without changing the response structure. 3. adds durationFilter option to `kafka-transactions.sh --list` Tests: - Client side test to build request with correct combination of duration filter and API version. `testBuildRequestWithDurationFilter` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org