Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-24 Thread via GitHub


jolshan merged PR #15384:
URL: https://github.com/apache/kafka/pull/15384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-22 Thread via GitHub


jolshan commented on PR #15384:
URL: https://github.com/apache/kafka/pull/15384#issuecomment-1960018327

   Something wrong with the build so I will restart.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-22 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1499709434


##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##
@@ -513,12 +514,16 @@ class TransactionStateManagerTest {
 
 putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing)
 putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing)
+// update time to create transactions with various durations
+time.sleep(1000)
 putTransaction(transactionalId = "t2", producerId = 2, state = 
PrepareCommit)
 putTransaction(transactionalId = "t3", producerId = 3, state = 
PrepareAbort)
+time.sleep(1000)

Review Comment:
   We should always use mock time :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-22 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1499702168


##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##
@@ -527,16 +532,20 @@ class TransactionStateManagerTest {
 def assertListTransactions(
   expectedTransactionalIds: Set[String],
   filterProducerIds: Set[Long] = Set.empty,
-  filterStates: Set[String] = Set.empty
+  filterStates: Set[String] = Set.empty,
+  filterDuration: Long = -1L
 ): Unit = {
-  val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates)
+  val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates, 
filterDuration)
   assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode))
   assertEquals(expectedTransactionalIds, 
listResponse.transactionStates.asScala.map(_.transactionalId).toSet)
   val expectedUnknownStates = filterStates.filter(state => 
TransactionState.fromName(state).isEmpty)
   assertEquals(expectedUnknownStates, 
listResponse.unknownStateFilters.asScala.toSet)
 }
-
 assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"))
+assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", 
"t7"), filterDuration = 0L)
+assertListTransactions(Set("t0", "t1", "t2", "t3"), filterDuration = 1000L)
+assertListTransactions(Set("t0", "t1"), filterDuration = 2000L)

Review Comment:
   Their durations are 2000L so they will be filtered out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-22 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1499698463


##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##
@@ -513,12 +514,16 @@ class TransactionStateManagerTest {
 
 putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing)
 putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing)
+// update time to create transactions with various durations
+time.sleep(1000)
 putTransaction(transactionalId = "t2", producerId = 2, state = 
PrepareCommit)
 putTransaction(transactionalId = "t3", producerId = 3, state = 
PrepareAbort)
+time.sleep(1000)

Review Comment:
   We are using mock time, so this should not slow down the test. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-22 Thread via GitHub


RamanVerma commented on PR #15384:
URL: https://github.com/apache/kafka/pull/15384#issuecomment-1959796902

   I think the unit tests look good.
   
   Is it possible to write/edit (in case we have one existing) an integration 
test that builds a request and validates the response from broker. You will 
need to feed in some transactions to the broker transactionManager cache as 
your test data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-22 Thread via GitHub


RamanVerma commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1499456622


##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##
@@ -527,16 +532,20 @@ class TransactionStateManagerTest {
 def assertListTransactions(
   expectedTransactionalIds: Set[String],
   filterProducerIds: Set[Long] = Set.empty,
-  filterStates: Set[String] = Set.empty
+  filterStates: Set[String] = Set.empty,
+  filterDuration: Long = -1L
 ): Unit = {
-  val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates)
+  val listResponse = 
transactionManager.listTransactionStates(filterProducerIds, filterStates, 
filterDuration)
   assertEquals(Errors.NONE, Errors.forCode(listResponse.errorCode))
   assertEquals(expectedTransactionalIds, 
listResponse.transactionStates.asScala.map(_.transactionalId).toSet)
   val expectedUnknownStates = filterStates.filter(state => 
TransactionState.fromName(state).isEmpty)
   assertEquals(expectedUnknownStates, 
listResponse.unknownStateFilters.asScala.toSet)
 }
-
 assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", "t7"))
+assertListTransactions(Set("t0", "t1", "t2", "t3", "t4", "t5", "t6", 
"t7"), filterDuration = 0L)
+assertListTransactions(Set("t0", "t1", "t2", "t3"), filterDuration = 1000L)
+assertListTransactions(Set("t0", "t1"), filterDuration = 2000L)

Review Comment:
   curious: why are we not getting t2 and t3 in this test (duration greater 
than 2000L)



##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##
@@ -513,12 +514,16 @@ class TransactionStateManagerTest {
 
 putTransaction(transactionalId = "t0", producerId = 0, state = Ongoing)
 putTransaction(transactionalId = "t1", producerId = 1, state = Ongoing)
+// update time to create transactions with various durations
+time.sleep(1000)
 putTransaction(transactionalId = "t2", producerId = 2, state = 
PrepareCommit)
 putTransaction(transactionalId = "t3", producerId = 3, state = 
PrepareAbort)
+time.sleep(1000)

Review Comment:
   We should reduce the sleep times to 100s of ms rather than 1000s. Similarly, 
change lines 545-548. This will speed up the test and still serve our purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496555226


##
tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java:
##
@@ -436,16 +436,26 @@ public String name() {
 
 @Override
 public void addSubparser(Subparsers subparsers) {
-subparsers.addParser(name())
+Subparser subparser = subparsers.addParser(name())
 .help("list transactions");
+
+subparser.addArgument("--duration-filter")
+.help("Duration (in millis) to filter by: if < 0, all 
transactions will be returned; " +
+"otherwise, only transactions running longer than 
this duration will be returned")
+.action(store())
+.type(Long.class)
+.required(false);
 }
 
 @Override
 public void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception {
+ListTransactionsOptions options = new ListTransactionsOptions();
+
Optional.ofNullable(ns.getLong("duration_filter")).ifPresent(options::durationFilter);

Review Comment:
   please update this line as well to reflect the new name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496360067


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {
+return durationFilter;
+}
+
 @Override
 public String toString() {
 return "ListTransactionsOptions(" +
 "filteredStates=" + filteredStates +
 ", filteredProducerIds=" + filteredProducerIds +
+", durationFilter=" + durationFilter +

Review Comment:
   updated in 
[8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7)



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {

Review Comment:
   updated in 
[8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7)



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -61,6 +62,11 @@ public ListTransactionsOptions 
filterProducerIds(Collection producerIdFilt
 return this;
 }
 
+public ListTransactionsOptions durationFilter(long durationMs) {

Review Comment:
   updated in 
[8bfc13a](https://github.com/apache/kafka/pull/15384/commits/8bfc13ae7551ecbc6df2502a4904892a4d1402b7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496339739


##
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java:
##
@@ -187,14 +187,25 @@ private void testDescribeProducers(
 assertEquals(expectedRows, new HashSet<>(table.subList(1, 
table.size(;
 }
 
-@Test
-public void testListTransactions() throws Exception {
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testListTransactions(boolean hasDurationFilter) throws 
Exception {
 String[] args = new String[] {
 "--bootstrap-server",
 "localhost:9092",
 "list"
 };
 
+if (hasDurationFilter) {
+args = new String[] {
+"--bootstrap-server",
+"localhost:9092",
+"list",
+"--duration-filter",
+Long.toString(Long.MAX_VALUE)

Review Comment:
   This just tests the cli cmd parsing parameters correctly. It will not call 
actual list transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-20 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1496132259


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   Im not sure what he is supposed to change  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-19 Thread via GitHub


RamanVerma commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1494984871


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   @yyu1993 this default value needs to be changed to -1L as well



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -61,6 +62,11 @@ public ListTransactionsOptions 
filterProducerIds(Collection producerIdFilt
 return this;
 }
 
+public ListTransactionsOptions durationFilter(long durationMs) {

Review Comment:
   Please add a comment to this method like we have for the other methods 
above. Also, we should probably change the method name to filterOnDuration, to 
match rest of the filter setting methods.



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {

Review Comment:
   Please add a Java doc comment to the method. Also, change the method name to 
`filteredDuration`



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {
+return durationFilter;
+}
+
 @Override
 public String toString() {
 return "ListTransactionsOptions(" +
 "filteredStates=" + filteredStates +
 ", filteredProducerIds=" + filteredProducerIds +
+", durationFilter=" + durationFilter +

Review Comment:
   nit: durationFilter -> filteredDuration



##
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java:
##
@@ -187,14 +187,25 @@ private void testDescribeProducers(
 assertEquals(expectedRows, new HashSet<>(table.subList(1, 
table.size(;
 }
 
-@Test
-public void testListTransactions() throws Exception {
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testListTransactions(boolean hasDurationFilter) throws 
Exception {
 String[] args = new String[] {
 "--bootstrap-server",
 "localhost:9092",
 "list"
 };
 
+if (hasDurationFilter) {
+args = new String[] {
+"--bootstrap-server",
+"localhost:9092",
+"list",
+"--duration-filter",
+Long.toString(Long.MAX_VALUE)

Review Comment:
   hmm this will not return anything, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493062297


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -61,6 +62,11 @@ public ListTransactionsOptions 
filterProducerIds(Collection producerIdFilt
 return this;
 }
 
+public ListTransactionsOptions durationFilter(Long durationMs) {

Review Comment:
   nit: these can all be primitive longs. (lowercase) There is usage in this 
file and the Admin file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493050657


##
clients/src/main/resources/common/message/ListTransactionsResponse.json:
##
@@ -17,7 +17,8 @@
   "apiKey": 66,
   "type": "response",
   "name": "ListTransactionsResponse",
-  "validVersions": "0",
+  // Version 1 is the same as vesion 0 (KIP-994).

Review Comment:
   Nice catch. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493041438


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() {
  * coordinators in the cluster and collect the state of all transactions. 
Users
  * should typically attempt to reduce the size of the result set using
  * {@link ListTransactionsOptions#filterProducerIds(Collection)} or
- * {@link ListTransactionsOptions#filterStates(Collection)}
+ * {@link ListTransactionsOptions#filterStates(Collection)} or

Review Comment:
   The ListTransactions api itself is able to filter by producer ids and states 
(see `handleListTransactions` in TransactionCoordinator). However, the cli tool 
does not support those filters currently. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493039266


##
clients/src/main/resources/common/message/ListTransactionsRequest.json:
##
@@ -18,14 +18,18 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "ListTransactionsRequest",
-  "validVersions": "0",
+  // version 1: adds DurationFilter to list transactions older than specified 
duration

Review Comment:
   nit: most specs capitalize V here.



##
clients/src/main/resources/common/message/ListTransactionsResponse.json:
##
@@ -17,7 +17,8 @@
   "apiKey": 66,
   "type": "response",
   "name": "ListTransactionsResponse",
-  "validVersions": "0",
+  // Version 1 is the same as vesion 0 (KIP-994).

Review Comment:
   nit: version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1493024798


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -1633,7 +1633,8 @@ default ListTransactionsResult listTransactions() {
  * coordinators in the cluster and collect the state of all transactions. 
Users
  * should typically attempt to reduce the size of the result set using
  * {@link ListTransactionsOptions#filterProducerIds(Collection)} or
- * {@link ListTransactionsOptions#filterStates(Collection)}
+ * {@link ListTransactionsOptions#filterStates(Collection)} or

Review Comment:
   Maybe a silly question, but I noticed this comment mentions filtering by 
producer ID or state. Do you know where that is done? It looks like 
durationFilter is the only subparser argument for list in TransactionCommand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492876179


##
tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java:
##
@@ -436,16 +436,25 @@ public String name() {
 
 @Override
 public void addSubparser(Subparsers subparsers) {
-subparsers.addParser(name())
+Subparser subparser = subparsers.addParser(name())
 .help("list transactions");
+
+subparser.addArgument("--duration-filter")
+.help("filter duration of transaction in ms, only 
transactions running longer than this duration will be returned")

Review Comment:
   Updated the help message in 
[81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492875733


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   Updated default value to -1L in other places for consistency in 
[81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492874583


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -35,6 +35,7 @@ public class ListTransactionsOptions extends 
AbstractOptions filteredStates = Collections.emptySet();
 private Set filteredProducerIds = Collections.emptySet();
 
+private Long durationFilter = 0L;

Review Comment:
   Updated default value to -1L in 
[81173c8](https://github.com/apache/kafka/pull/15384/commits/81173c89dbf50a98671694f99a051dad1e0b1821)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492849366


##
tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java:
##
@@ -436,16 +436,25 @@ public String name() {
 
 @Override
 public void addSubparser(Subparsers subparsers) {
-subparsers.addParser(name())
+Subparser subparser = subparsers.addParser(name())
 .help("list transactions");
+
+subparser.addArgument("--duration-filter")
+.help("filter duration of transaction in ms, only 
transactions running longer than this duration will be returned")

Review Comment:
   we should mention the default/value that gives all running transactions. 
Providing 0 and getting a ton of transactions might be confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492847691


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   nit: this is -1 -- is that intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


jolshan commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1492846127


##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -35,6 +35,7 @@ public class ListTransactionsOptions extends 
AbstractOptions filteredStates = Collections.emptySet();
 private Set filteredProducerIds = Collections.emptySet();
 
+private Long durationFilter = 0L;

Review Comment:
   nit: any reason we chose default to be zero and not -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-16 Thread via GitHub


yyu1993 opened a new pull request, #15384:
URL: https://github.com/apache/kafka/pull/15384

   Introduces a new filter in ListTransactionsRequest API. This enables caller 
to filter on transactions that have been running for longer than a certain 
duration of time.
   This PR includes the following changes:
   1.  bumps version for ListTransactionsRequest API to 1. Set the 
durationFilter to 0 when communicating with an older broker that does not 
support version 1.
   2. bumps version for ListTransactionsResponse to 1 without changing the 
response structure.
   3. adds durationFilter option to `kafka-transactions.sh --list`
   
   Tests:
   
   - Client side test to build request with correct combination of duration 
filter and API version. `testBuildRequestWithDurationFilter`
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org