Repository: kafka Updated Branches: refs/heads/trunk b6c6291e1 -> 655367971
KAFKA-3427: broker can return incorrect version of fetch response when the broker hits an unknown exception Author: Jun Rao <[email protected]> Reviewers: Ismael Juma, Becket Qin Closes #1101 from junrao/kafka-3427 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65536797 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65536797 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65536797 Branch: refs/heads/trunk Commit: 655367971875a8d6a079ff7d186c05a6b76a9c53 Parents: b6c6291 Author: Jun Rao <[email protected]> Authored: Sat Mar 19 18:04:56 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Sat Mar 19 18:04:56 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/api/FetchRequest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/65536797/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index f47942c..83e139a 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -150,7 +150,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, case (topicAndPartition, data) => (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty)) } - val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) + val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] + val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId) // Magic value does not matter here because the message set is empty requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) }
