jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1124830314
##########
clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java:
##########
@@ -58,42 +65,75 @@ public void setUp() {
errorsMap.put(tp2, errorTwo);
}
- @Test
- public void testConstructorWithErrorResponse() {
- AddPartitionsToTxnResponse response = new
AddPartitionsToTxnResponse(throttleTimeMs, errorsMap);
-
- assertEquals(expectedErrorCounts, response.errorCounts());
- assertEquals(throttleTimeMs, response.throttleTimeMs());
- }
-
- @Test
- public void testParse() {
-
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+ public void testParse(short version) {
AddPartitionsToTxnTopicResultCollection topicCollection = new
AddPartitionsToTxnTopicResultCollection();
AddPartitionsToTxnTopicResult topicResult = new
AddPartitionsToTxnTopicResult();
topicResult.setName(topicOne);
- topicResult.results().add(new AddPartitionsToTxnPartitionResult()
- .setErrorCode(errorOne.code())
+ topicResult.resultsByPartition().add(new
AddPartitionsToTxnPartitionResult()
+ .setPartitionErrorCode(errorOne.code())
.setPartitionIndex(partitionOne));
- topicResult.results().add(new AddPartitionsToTxnPartitionResult()
- .setErrorCode(errorTwo.code())
+ topicResult.resultsByPartition().add(new
AddPartitionsToTxnPartitionResult()
+ .setPartitionErrorCode(errorTwo.code())
.setPartitionIndex(partitionTwo));
topicCollection.add(topicResult);
+
+ if (version < 4) {
+ AddPartitionsToTxnResponseData data = new
AddPartitionsToTxnResponseData()
+ .setResultsByTopicV3AndBelow(topicCollection)
+ .setThrottleTimeMs(throttleTimeMs);
+ AddPartitionsToTxnResponse response = new
AddPartitionsToTxnResponse(data);
- AddPartitionsToTxnResponseData data = new
AddPartitionsToTxnResponseData()
- .setResults(topicCollection)
-
.setThrottleTimeMs(throttleTimeMs);
- AddPartitionsToTxnResponse response = new
AddPartitionsToTxnResponse(data);
-
- for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) {
AddPartitionsToTxnResponse parsedResponse =
AddPartitionsToTxnResponse.parse(response.serialize(version), version);
assertEquals(expectedErrorCounts, parsedResponse.errorCounts());
assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs());
assertEquals(version >= 1,
parsedResponse.shouldClientThrottle(version));
+ } else {
+ AddPartitionsToTxnResultCollection results = new
AddPartitionsToTxnResultCollection();
+ results.add(new
AddPartitionsToTxnResult().setTransactionalId("txn1").setTopicResults(topicCollection));
+
+ // Create another transaction with new name and errorOne for a
single partition.
+ Map<TopicPartition, Errors> txnTwoExpectedErrors =
Collections.singletonMap(tp2, errorOne);
+
results.add(AddPartitionsToTxnResponse.resultForTransaction("txn2",
txnTwoExpectedErrors));
+
+ AddPartitionsToTxnResponseData data = new
AddPartitionsToTxnResponseData()
+ .setResultsByTransaction(results)
+ .setThrottleTimeMs(throttleTimeMs);
+ AddPartitionsToTxnResponse response = new
AddPartitionsToTxnResponse(data);
+
+ Map<Errors, Integer> newExpectedErrorCounts = new HashMap<>();
+ newExpectedErrorCounts.put(Errors.NONE, 1); // top level error
+ newExpectedErrorCounts.put(errorOne, 2);
+ newExpectedErrorCounts.put(errorTwo, 1);
+
+ AddPartitionsToTxnResponse parsedResponse =
AddPartitionsToTxnResponse.parse(response.serialize(version), version);
+ assertEquals(txnTwoExpectedErrors,
errorsForTransaction(response.getTransactionTopicResults("txn2")));
+ assertEquals(newExpectedErrorCounts, parsedResponse.errorCounts());
+ assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs());
+ assertTrue(parsedResponse.shouldClientThrottle(version));
}
}
+
+ @Test
+ public void testBatchedErrors() {
+ Map<TopicPartition, Errors> txn1Errors = Collections.singletonMap(tp1,
errorOne);
+ Map<TopicPartition, Errors> txn2Errors = Collections.singletonMap(tp1,
errorOne);
+
+ AddPartitionsToTxnResult transaction1 =
AddPartitionsToTxnResponse.resultForTransaction("txn1", txn1Errors);
+ AddPartitionsToTxnResult transaction2 =
AddPartitionsToTxnResponse.resultForTransaction("txn2", txn2Errors);
+
+ AddPartitionsToTxnResultCollection results = new
AddPartitionsToTxnResultCollection();
+ results.add(transaction1);
+ results.add(transaction2);
+
+ AddPartitionsToTxnResponse response = new
AddPartitionsToTxnResponse(new
AddPartitionsToTxnResponseData().setResultsByTransaction(results));
+
+ assertEquals(txn1Errors,
errorsForTransaction(response.getTransactionTopicResults("txn1")));
+ assertEquals(txn2Errors,
errorsForTransaction(response.getTransactionTopicResults("txn2")));
+ }
Review Comment:
I tested in other tests, but I can put one here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]