jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1124830314


##########
clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java:
##########
@@ -58,42 +65,75 @@ public void setUp() {
         errorsMap.put(tp2, errorTwo);
     }
 
-    @Test
-    public void testConstructorWithErrorResponse() {
-        AddPartitionsToTxnResponse response = new 
AddPartitionsToTxnResponse(throttleTimeMs, errorsMap);
-
-        assertEquals(expectedErrorCounts, response.errorCounts());
-        assertEquals(throttleTimeMs, response.throttleTimeMs());
-    }
-
-    @Test
-    public void testParse() {
-
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+    public void testParse(short version) {
         AddPartitionsToTxnTopicResultCollection topicCollection = new 
AddPartitionsToTxnTopicResultCollection();
 
         AddPartitionsToTxnTopicResult topicResult = new 
AddPartitionsToTxnTopicResult();
         topicResult.setName(topicOne);
 
-        topicResult.results().add(new AddPartitionsToTxnPartitionResult()
-                                      .setErrorCode(errorOne.code())
+        topicResult.resultsByPartition().add(new 
AddPartitionsToTxnPartitionResult()
+                                      .setPartitionErrorCode(errorOne.code())
                                       .setPartitionIndex(partitionOne));
 
-        topicResult.results().add(new AddPartitionsToTxnPartitionResult()
-                                      .setErrorCode(errorTwo.code())
+        topicResult.resultsByPartition().add(new 
AddPartitionsToTxnPartitionResult()
+                                      .setPartitionErrorCode(errorTwo.code())
                                       .setPartitionIndex(partitionTwo));
 
         topicCollection.add(topicResult);
+            
+        if (version < 4) {
+            AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData()
+                    .setResultsByTopicV3AndBelow(topicCollection)
+                    .setThrottleTimeMs(throttleTimeMs);
+            AddPartitionsToTxnResponse response = new 
AddPartitionsToTxnResponse(data);
 
-        AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData()
-                                                  .setResults(topicCollection)
-                                                  
.setThrottleTimeMs(throttleTimeMs);
-        AddPartitionsToTxnResponse response = new 
AddPartitionsToTxnResponse(data);
-
-        for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) {
             AddPartitionsToTxnResponse parsedResponse = 
AddPartitionsToTxnResponse.parse(response.serialize(version), version);
             assertEquals(expectedErrorCounts, parsedResponse.errorCounts());
             assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs());
             assertEquals(version >= 1, 
parsedResponse.shouldClientThrottle(version));
+        } else {
+            AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+            results.add(new 
AddPartitionsToTxnResult().setTransactionalId("txn1").setTopicResults(topicCollection));
+            
+            // Create another transaction with new name and errorOne for a 
single partition.
+            Map<TopicPartition, Errors> txnTwoExpectedErrors = 
Collections.singletonMap(tp2, errorOne);
+            
results.add(AddPartitionsToTxnResponse.resultForTransaction("txn2", 
txnTwoExpectedErrors));
+
+            AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData()
+                    .setResultsByTransaction(results)
+                    .setThrottleTimeMs(throttleTimeMs);
+            AddPartitionsToTxnResponse response = new 
AddPartitionsToTxnResponse(data);
+
+            Map<Errors, Integer> newExpectedErrorCounts = new HashMap<>();
+            newExpectedErrorCounts.put(Errors.NONE, 1); // top level error
+            newExpectedErrorCounts.put(errorOne, 2);
+            newExpectedErrorCounts.put(errorTwo, 1);
+            
+            AddPartitionsToTxnResponse parsedResponse = 
AddPartitionsToTxnResponse.parse(response.serialize(version), version);
+            assertEquals(txnTwoExpectedErrors, 
errorsForTransaction(response.getTransactionTopicResults("txn2")));
+            assertEquals(newExpectedErrorCounts, parsedResponse.errorCounts());
+            assertEquals(throttleTimeMs, parsedResponse.throttleTimeMs());
+            assertTrue(parsedResponse.shouldClientThrottle(version));
         }
     }
+    
+    @Test
+    public void testBatchedErrors() {
+        Map<TopicPartition, Errors> txn1Errors = Collections.singletonMap(tp1, 
errorOne);
+        Map<TopicPartition, Errors> txn2Errors = Collections.singletonMap(tp1, 
errorOne);
+        
+        AddPartitionsToTxnResult transaction1 = 
AddPartitionsToTxnResponse.resultForTransaction("txn1", txn1Errors);
+        AddPartitionsToTxnResult transaction2 = 
AddPartitionsToTxnResponse.resultForTransaction("txn2", txn2Errors);
+        
+        AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+        results.add(transaction1);
+        results.add(transaction2);
+        
+        AddPartitionsToTxnResponse response = new 
AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setResultsByTransaction(results));
+        
+        assertEquals(txn1Errors, 
errorsForTransaction(response.getTransactionTopicResults("txn1")));
+        assertEquals(txn2Errors, 
errorsForTransaction(response.getTransactionTopicResults("txn2")));
+    }

Review Comment:
   I tested in other tests, but I can put one here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to