chia7712 commented on a change in pull request #10332: URL: https://github.com/apache/kafka/pull/10332#discussion_r596291543
########## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ########## @@ -40,42 +42,42 @@ public void produceResponseV5Test() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); TopicPartition tp0 = new TopicPartition("test", 0); - responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100)); + responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100)); ProduceResponse v5Response = new ProduceResponse(responseData, 10); short version = 5; ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0); ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away. - ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, - buffer, version); - - assertEquals(1, v5FromBytes.responses().size()); - assertTrue(v5FromBytes.responses().containsKey(tp0)); - ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); - assertEquals(100, partitionResponse.logStartOffset); - assertEquals(10000, partitionResponse.baseOffset); - assertEquals(10, v5FromBytes.throttleTimeMs()); - assertEquals(responseData, v5Response.responses()); + ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version); + + assertEquals(1, v5FromBytes.data().responses().size()); + ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next(); + assertEquals(1, topicProduceResponse.partitionResponses().size()); + ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next(); + TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index()); + assertEquals(tp0, tp); + + assertEquals(100, partitionProduceResponse.logStartOffset()); + assertEquals(10000, partitionProduceResponse.baseOffset()); + assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs()); + assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode())); + assertEquals(null, partitionProduceResponse.errorMessage()); Review comment: Could you use `assertNull`? ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ########## @@ -40,42 +42,42 @@ public void produceResponseV5Test() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); TopicPartition tp0 = new TopicPartition("test", 0); - responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100)); + responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100)); ProduceResponse v5Response = new ProduceResponse(responseData, 10); short version = 5; ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0); ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away. - ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, - buffer, version); - - assertEquals(1, v5FromBytes.responses().size()); - assertTrue(v5FromBytes.responses().containsKey(tp0)); - ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); - assertEquals(100, partitionResponse.logStartOffset); - assertEquals(10000, partitionResponse.baseOffset); - assertEquals(10, v5FromBytes.throttleTimeMs()); - assertEquals(responseData, v5Response.responses()); + ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version); + + assertEquals(1, v5FromBytes.data().responses().size()); + ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next(); + assertEquals(1, topicProduceResponse.partitionResponses().size()); + ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next(); + TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index()); + assertEquals(tp0, tp); + + assertEquals(100, partitionProduceResponse.logStartOffset()); + assertEquals(10000, partitionProduceResponse.baseOffset()); + assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs()); + assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode())); + assertEquals(null, partitionProduceResponse.errorMessage()); + assertTrue(partitionProduceResponse.recordErrors().isEmpty()); } @SuppressWarnings("deprecation") @Test public void produceResponseVersionTest() { Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>(); - responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, - 10000, RecordBatch.NO_TIMESTAMP, 100)); + responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE, 10000, RecordBatch.NO_TIMESTAMP, 100)); ProduceResponse v0Response = new ProduceResponse(responseData); ProduceResponse v1Response = new ProduceResponse(responseData, 10); ProduceResponse v2Response = new ProduceResponse(responseData, 10); assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero"); assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10"); assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10"); - assertEquals(responseData, v0Response.responses(), "Response data does not match"); Review comment: Could you add similar checks? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org