Repository: kafka Updated Branches: refs/heads/trunk 0273c4379 -> cbdd8218c
KAFKA-2756: Use request version Id instead of latest version Id to parse the corresponding response. Author: Guozhang Wang <[email protected]> Reviewers: Guozhang Wang Closes #438 from guozhangwang/K2756 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cbdd8218 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cbdd8218 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cbdd8218 Branch: refs/heads/trunk Commit: cbdd8218c16b56d952267c162dc1dfc93191571e Parents: 0273c43 Author: Matthew Bruce <[email protected]> Authored: Thu Nov 5 15:51:42 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 15:51:42 2015 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/kafka/clients/NetworkClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cbdd8218/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 2c56751..6c8853d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -458,8 +458,10 @@ public class NetworkClient implements KafkaClient { String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); + // Always expect the response version id to be the same as the request version id short apiKey = req.request().header().apiKey(); - Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); + short apiVer = req.request().header().apiVersion(); + Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload()); correlate(req.request().header(), header); if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) responses.add(new ClientResponse(req, now, false, body));
