[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110443918


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Yep, you got there first 😄. I would probably vote to make the change. 
Tracking separate connections does not sound attractive. We could still do that 
even with the modified protocol if there is a noticeable regression for old 
clients.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110442234


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",

Review Comment:
   Would it make sense to set `latestVersionUnstable` so that we reserve some 
flexibility to change the API after we merge this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-17 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1110441561


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,35 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
+{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false,

Review Comment:
   Yeah, interesting point. I could see moving `VerifyOnly` to the level of 
`transactionalId`. An interesting corollary if we do batch both modes together 
is that verify-only requests may end up blocking on replication even though we 
are only checking the state in memory. This would kind of penalize old clients, 
but maybe that's acceptable. Unless we used separate connections for each mode, 
perhaps it is unavoidable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107764891


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +193,41 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+if (version < 4) {
+final HashMap errors = new HashMap<>();
+for (TopicPartition partition : partitions()) {
+errors.put(partition, error);
+}
+return new AddPartitionsToTxnResponse(throttleTimeMs, errors);
+} else {
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();

Review Comment:
   Many new APIs do include a top-level error code, so there is definitely 
precedent. I kind of like giving the broker an easy way to indicate a failure 
without requiring it to do a bunch of wasteful work. It's not a common case 
though, so I'm not sure how much it matters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107762271


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java:
##
@@ -99,6 +112,7 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
 data.setThrottleTimeMs(throttleTimeMs);
 }
 
+// Only used for versions < 4

Review Comment:
   As mentioned elsewhere, the old API is a special case of the new API where 
there just happens to be a single transactionalId. I think if we take that 
perspective, then we can make the methods general and hide the version details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107760565


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java:
##
@@ -49,28 +52,37 @@ public class AddPartitionsToTxnResponse extends 
AbstractResponse {
 private final AddPartitionsToTxnResponseData data;
 
 private Map cachedErrorsMap = null;
+
+private Map> cachedAllErrorsMap = null;
 
 public AddPartitionsToTxnResponse(AddPartitionsToTxnResponseData data) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
 this.data = data;
 }
 
+// Only used for versions < 4

Review Comment:
   Yeah, the problem is that it makes the usage more confusing. I'd rather have 
a larger patch I guess and try to abstract some of the version details away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107759467


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -66,24 +97,22 @@ public Builder(final String transactionalId,
 AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
 for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
 topics.add(new AddPartitionsToTxnTopic()
-   .setName(partitionEntry.getKey())
-   .setPartitions(partitionEntry.getValue()));
+.setName(partitionEntry.getKey())
+.setPartitions(partitionEntry.getValue()));
 }
-
-this.data = new AddPartitionsToTxnRequestData()
-.setTransactionalId(transactionalId)
-.setProducerId(producerId)
-.setProducerEpoch(producerEpoch)
-.setTopics(topics);
+return topics;
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(data, version);
+short clampedVersion = (isClientRequest && version > 3) ? 3 : 
version;

Review Comment:
   It comes from `AbstractRequest.Builder`. You can pass the min and max 
versions in the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107606857


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 private final AddPartitionsToTxnRequestData data;
 
 private List cachedPartitions = null;
+
+private Map> cachedPartitionsByTransaction = 
null;
+
+private final short version;
 
 public static class Builder extends 
AbstractRequest.Builder {
 public final AddPartitionsToTxnRequestData data;
+public final boolean isClientRequest;
 
-public Builder(final AddPartitionsToTxnRequestData data) {
+// Only used for versions < 4
+public Builder(String transactionalId,
+   long producerId,
+   short producerEpoch,
+   List partitions) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.data = data;
+this.isClientRequest = true;
+
+AddPartitionsToTxnTopicCollection topics = 
compileTopics(partitions);
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setTopics(topics);
 }
 
-public Builder(final String transactionalId,
-   final long producerId,
-   final short producerEpoch,
-   final List partitions) {
+public Builder(AddPartitionsToTxnTransactionCollection transactions,
+   boolean verifyOnly) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+this.isClientRequest = false;
 
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactions(transactions)
+.setVerifyOnly(verifyOnly);
+}
+
+private AddPartitionsToTxnTopicCollection compileTopics(final 
List partitions) {

Review Comment:
   nit: how about `buildTxnTopicCollection`?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 private final AddPartitionsToTxnRequestData data;
 
 private List cachedPartitions = null;
+
+private Map> cachedPartitionsByTransaction = 
null;
+
+private final short version;
 
 public static class Builder extends 
AbstractRequest.Builder {
 public final AddPartitionsToTxnRequestData data;
+public final boolean isClientRequest;
 
-public Builder(final AddPartitionsToTxnRequestData data) {
+// Only used for versions < 4
+public Builder(String transactionalId,
+   long producerId,
+   short producerEpoch,
+   List partitions) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.data = data;
+this.isClientRequest = true;
+
+AddPartitionsToTxnTopicCollection topics = 
compileTopics(partitions);
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)

Review Comment:
   Does it make sense to set verifyOnly to false explicitly here?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -66,24 +97,22 @@ public Builder(final String transactionalId,
 AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
 for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
 topics.add(new AddPartitionsToTxnTopic()
-   .setName(partitionEntry.getKey())
-   .setPartitions(partitionEntry.getValue()));
+.setName(partitionEntry.getKey())
+.setPartitions(partitionEntry.getValue()));
 }
-
-this.data = new AddPartitionsToTxnRequestData()
-.setTransactionalId(transactionalId)
-.setProducerId(producerId)
-.setProducerEpoch(producerEpoch)
-.setTopics(topics);
+return topics;
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(data, version);
+short clampedVersion = (isClientRequest && version > 3) ? 3 : 
version;

Review Comment:
   It's a little strange to ignore the version. I think another way to do this 
is to set the `latestAllowedVersion` to 3 in the client builder. That will 
ensure that the client does not try to use a higher version even if the broker 
su