jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1124830314
########## clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java: ########## @@ -58,42 +65,75 @@ public void setUp() { errorsMap.put(tp2, errorTwo); } - @Test - public void testConstructorWithErrorResponse() { - AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(throttleTimeMs, errorsMap); - - assertEquals(expectedErrorCounts, response.errorCounts()); - assertEquals(throttleTimeMs, response.throttleTimeMs()); - } - - @Test - public void testParse() { - + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + public void testParse(short version) { AddPartitionsToTxnTopicResultCollection topicCollection = new AddPartitionsToTxnTopicResultCollection(); AddPartitionsToTxnTopicResult topicResult = new AddPartitionsToTxnTopicResult(); topicResult.setName(topicOne); - topicResult.results().add(new AddPartitionsToTxnPartitionResult() - .setErrorCode(errorOne.code()) + topicResult.resultsByPartition().add(new AddPartitionsToTxnPartitionResult() + .setPartitionErrorCode(errorOne.code()) .setPartitionIndex(partitionOne)); - topicResult.results().add(new AddPartitionsToTxnPartitionResult() - .setErrorCode(errorTwo.code()) + topicResult.resultsByPartition().add(new AddPartitionsToTxnPartitionResult() + .setPartitionErrorCode(errorTwo.code()) .setPartitionIndex(partitionTwo)); topicCollection.add(topicResult); + + if (version < 4) { + AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData() + .setResultsByTopicV3AndBelow(topicCollection) + .setThrottleTimeMs(throttleTimeMs); + AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data); - AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData() - .setResults(topicCollection) - .setThrottleTimeMs(throttleTimeMs); - AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data); - - for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) { AddPartitionsToTxnResponse parsedResponse = AddPartitionsToTxnResponse.parse(response.serialize(version), version); assertEquals(expectedErrorCounts, parsedResponse.errorCounts()); assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs()); assertEquals(version >= 1, parsedResponse.shouldClientThrottle(version)); + } else { + AddPartitionsToTxnResultCollection results = new AddPartitionsToTxnResultCollection(); + results.add(new AddPartitionsToTxnResult().setTransactionalId("txn1").setTopicResults(topicCollection)); + + // Create another transaction with new name and errorOne for a single partition. + Map<TopicPartition, Errors> txnTwoExpectedErrors = Collections.singletonMap(tp2, errorOne); + results.add(AddPartitionsToTxnResponse.resultForTransaction("txn2", txnTwoExpectedErrors)); + + AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData() + .setResultsByTransaction(results) + .setThrottleTimeMs(throttleTimeMs); + AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data); + + Map<Errors, Integer> newExpectedErrorCounts = new HashMap<>(); + newExpectedErrorCounts.put(Errors.NONE, 1); // top level error + newExpectedErrorCounts.put(errorOne, 2); + newExpectedErrorCounts.put(errorTwo, 1); + + AddPartitionsToTxnResponse parsedResponse = AddPartitionsToTxnResponse.parse(response.serialize(version), version); + assertEquals(txnTwoExpectedErrors, errorsForTransaction(response.getTransactionTopicResults("txn2"))); + assertEquals(newExpectedErrorCounts, parsedResponse.errorCounts()); + assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs()); + assertTrue(parsedResponse.shouldClientThrottle(version)); } } + + @Test + public void testBatchedErrors() { + Map<TopicPartition, Errors> txn1Errors = Collections.singletonMap(tp1, errorOne); + Map<TopicPartition, Errors> txn2Errors = Collections.singletonMap(tp1, errorOne); + + AddPartitionsToTxnResult transaction1 = AddPartitionsToTxnResponse.resultForTransaction("txn1", txn1Errors); + AddPartitionsToTxnResult transaction2 = AddPartitionsToTxnResponse.resultForTransaction("txn2", txn2Errors); + + AddPartitionsToTxnResultCollection results = new AddPartitionsToTxnResultCollection(); + results.add(transaction1); + results.add(transaction2); + + AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setResultsByTransaction(results)); + + assertEquals(txn1Errors, errorsForTransaction(response.getTransactionTopicResults("txn1"))); + assertEquals(txn2Errors, errorsForTransaction(response.getTransactionTopicResults("txn2"))); + } Review Comment: I tested in other tests, but I can put one here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org