Repository: kafka Updated Branches: refs/heads/trunk 995d0d369 -> 4aa3dab3d
KAFKA-1883 Fix NullPointerException in RequestSendThread; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aa3dab3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aa3dab3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aa3dab3 Branch: refs/heads/trunk Commit: 4aa3dab3de088096461941353ba27cb37f1bd9d1 Parents: 995d0d3 Author: jaikiran pai <[email protected]> Authored: Sun Jan 25 18:54:51 2015 -0800 Committer: Neha Narkhede <[email protected]> Committed: Sun Jan 25 18:54:58 2015 -0800 ---------------------------------------------------------------------- .../controller/ControllerChannelManager.scala | 32 +++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4aa3dab3/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index eb492f0..fbef34c 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -125,7 +125,7 @@ class RequestSendThread(val controllerId: Int, try { lock synchronized { var isSendSuccessful = false - while(isRunning.get() && !isSendSuccessful) { + while (isRunning.get() && !isSendSuccessful) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { @@ -136,7 +136,7 @@ class RequestSendThread(val controllerId: Int, case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(("Controller %d epoch %d fails to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, - request.toString, toBroker.toString()), e) + request.toString, toBroker.toString()), e) channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false @@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int, Utils.swallow(Thread.sleep(300)) } } - var response: RequestOrResponse = null - request.requestId.get match { - case RequestKeys.LeaderAndIsrKey => - response = LeaderAndIsrResponse.readFrom(receive.buffer) - case RequestKeys.StopReplicaKey => - response = StopReplicaResponse.readFrom(receive.buffer) - case RequestKeys.UpdateMetadataKey => - response = UpdateMetadataResponse.readFrom(receive.buffer) - } - stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" - .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) + if (receive != null) { + var response: RequestOrResponse = null + request.requestId.get match { + case RequestKeys.LeaderAndIsrKey => + response = LeaderAndIsrResponse.readFrom(receive.buffer) + case RequestKeys.StopReplicaKey => + response = StopReplicaResponse.readFrom(receive.buffer) + case RequestKeys.UpdateMetadataKey => + response = UpdateMetadataResponse.readFrom(receive.buffer) + } + stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" + .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) - if(callback != null) { - callback(response) + if (callback != null) { + callback(response) + } } } } catch {
