[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110443918 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Yep, you got there first 😄. I would probably vote to make the change. Tracking separate connections does not sound attractive. We could still do that even with the modified protocol if there is a noticeable regression for old clients. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110442234 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", Review Comment: Would it make sense to set `latestVersionUnstable` so that we reserve some flexibility to change the API after we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1110441561 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -23,17 +23,35 @@ // Version 2 adds the support for new error code PRODUCER_FENCED. // // Version 3 enables flexible versions. - "validVersions": "0-3", + // + // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", +{ "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": false, Review Comment: Yeah, interesting point. I could see moving `VerifyOnly` to the level of `transactionalId`. An interesting corollary if we do batch both modes together is that verify-only requests may end up blocking on replication even though we are only checking the state in memory. This would kind of penalize old clients, but maybe that's acceptable. Unless we used separate connections for each mode, perhaps it is unavoidable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107764891 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -118,11 +193,41 @@ public AddPartitionsToTxnRequestData data() { @Override public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) { -final HashMap errors = new HashMap<>(); -for (TopicPartition partition : partitions()) { -errors.put(partition, Errors.forException(e)); +Errors error = Errors.forException(e); +if (version < 4) { +final HashMap errors = new HashMap<>(); +for (TopicPartition partition : partitions()) { +errors.put(partition, error); +} +return new AddPartitionsToTxnResponse(throttleTimeMs, errors); +} else { +AddPartitionsToTxnResponseData response = new AddPartitionsToTxnResponseData(); Review Comment: Many new APIs do include a top-level error code, so there is definitely precedent. I kind of like giving the broker an easy way to indicate a failure without requiring it to do a bunch of wasteful work. It's not a common case though, so I'm not sure how much it matters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107762271 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java: ## @@ -99,6 +112,7 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } +// Only used for versions < 4 Review Comment: As mentioned elsewhere, the old API is a special case of the new API where there just happens to be a single transactionalId. I think if we take that perspective, then we can make the methods general and hide the version details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107760565 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java: ## @@ -49,28 +52,37 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { private final AddPartitionsToTxnResponseData data; private Map cachedErrorsMap = null; + +private Map> cachedAllErrorsMap = null; public AddPartitionsToTxnResponse(AddPartitionsToTxnResponseData data) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); this.data = data; } +// Only used for versions < 4 Review Comment: Yeah, the problem is that it makes the usage more confusing. I'd rather have a larger patch I guess and try to abstract some of the version details away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107759467 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -66,24 +97,22 @@ public Builder(final String transactionalId, AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); for (Map.Entry> partitionEntry : partitionMap.entrySet()) { topics.add(new AddPartitionsToTxnTopic() - .setName(partitionEntry.getKey()) - .setPartitions(partitionEntry.getValue())); +.setName(partitionEntry.getKey()) +.setPartitions(partitionEntry.getValue())); } - -this.data = new AddPartitionsToTxnRequestData() -.setTransactionalId(transactionalId) -.setProducerId(producerId) -.setProducerEpoch(producerEpoch) -.setTopics(topics); +return topics; } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(data, version); +short clampedVersion = (isClientRequest && version > 3) ? 3 : version; Review Comment: It comes from `AbstractRequest.Builder`. You can pass the min and max versions in the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107606857 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { private final AddPartitionsToTxnRequestData data; private List cachedPartitions = null; + +private Map> cachedPartitionsByTransaction = null; + +private final short version; public static class Builder extends AbstractRequest.Builder { public final AddPartitionsToTxnRequestData data; +public final boolean isClientRequest; -public Builder(final AddPartitionsToTxnRequestData data) { +// Only used for versions < 4 +public Builder(String transactionalId, + long producerId, + short producerEpoch, + List partitions) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.data = data; +this.isClientRequest = true; + +AddPartitionsToTxnTopicCollection topics = compileTopics(partitions); + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setTopics(topics); } -public Builder(final String transactionalId, - final long producerId, - final short producerEpoch, - final List partitions) { +public Builder(AddPartitionsToTxnTransactionCollection transactions, + boolean verifyOnly) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); +this.isClientRequest = false; +this.data = new AddPartitionsToTxnRequestData() +.setTransactions(transactions) +.setVerifyOnly(verifyOnly); +} + +private AddPartitionsToTxnTopicCollection compileTopics(final List partitions) { Review Comment: nit: how about `buildTxnTopicCollection`? ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { private final AddPartitionsToTxnRequestData data; private List cachedPartitions = null; + +private Map> cachedPartitionsByTransaction = null; + +private final short version; public static class Builder extends AbstractRequest.Builder { public final AddPartitionsToTxnRequestData data; +public final boolean isClientRequest; -public Builder(final AddPartitionsToTxnRequestData data) { +// Only used for versions < 4 +public Builder(String transactionalId, + long producerId, + short producerEpoch, + List partitions) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.data = data; +this.isClientRequest = true; + +AddPartitionsToTxnTopicCollection topics = compileTopics(partitions); + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) Review Comment: Does it make sense to set verifyOnly to false explicitly here? ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -66,24 +97,22 @@ public Builder(final String transactionalId, AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); for (Map.Entry> partitionEntry : partitionMap.entrySet()) { topics.add(new AddPartitionsToTxnTopic() - .setName(partitionEntry.getKey()) - .setPartitions(partitionEntry.getValue())); +.setName(partitionEntry.getKey()) +.setPartitions(partitionEntry.getValue())); } - -this.data = new AddPartitionsToTxnRequestData() -.setTransactionalId(transactionalId) -.setProducerId(producerId) -.setProducerEpoch(producerEpoch) -.setTopics(topics); +return topics; } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(data, version); +short clampedVersion = (isClientRequest && version > 3) ? 3 : version; Review Comment: It's a little strange to ignore the version. I think another way to do this is to set the `latestAllowedVersion` to 3 in the client builder. That will ensure that the client does not try to use a higher version even if the broker su