Repository: kafka
Updated Branches:
  refs/heads/trunk 995d0d369 -> 4aa3dab3d


KAFKA-1883 Fix NullPointerException in RequestSendThread; reviewed by Neha 
Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aa3dab3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aa3dab3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aa3dab3

Branch: refs/heads/trunk
Commit: 4aa3dab3de088096461941353ba27cb37f1bd9d1
Parents: 995d0d3
Author: jaikiran pai <[email protected]>
Authored: Sun Jan 25 18:54:51 2015 -0800
Committer: Neha Narkhede <[email protected]>
Committed: Sun Jan 25 18:54:58 2015 -0800

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 32 +++++++++++---------
 1 file changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4aa3dab3/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..fbef34c 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -125,7 +125,7 @@ class RequestSendThread(val controllerId: Int,
     try {
       lock synchronized {
         var isSendSuccessful = false
-        while(isRunning.get() && !isSendSuccessful) {
+        while (isRunning.get() && !isSendSuccessful) {
           // if a broker goes down for a long time, then at some point the 
controller's zookeeper listener will trigger a
           // removeBroker which will invoke shutdown() on this thread. At that 
point, we will stop retrying.
           try {
@@ -136,7 +136,7 @@ class RequestSendThread(val controllerId: Int,
             case e: Throwable => // if the send was not successful, reconnect 
to broker and resend the message
               warn(("Controller %d epoch %d fails to send request %s to broker 
%s. " +
                 "Reconnecting to broker.").format(controllerId, 
controllerContext.epoch,
-                request.toString, toBroker.toString()), e)
+                  request.toString, toBroker.toString()), e)
               channel.disconnect()
               connectToBroker(toBroker, channel)
               isSendSuccessful = false
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
               Utils.swallow(Thread.sleep(300))
           }
         }
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndIsrResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaKey =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-          case RequestKeys.UpdateMetadataKey =>
-            response = UpdateMetadataResponse.readFrom(receive.buffer)
-        }
-        stateChangeLogger.trace("Controller %d epoch %d received response %s 
for a request sent to broker %s"
-                                  .format(controllerId, 
controllerContext.epoch, response.toString, toBroker.toString))
+        if (receive != null) {
+          var response: RequestOrResponse = null
+          request.requestId.get match {
+            case RequestKeys.LeaderAndIsrKey =>
+              response = LeaderAndIsrResponse.readFrom(receive.buffer)
+            case RequestKeys.StopReplicaKey =>
+              response = StopReplicaResponse.readFrom(receive.buffer)
+            case RequestKeys.UpdateMetadataKey =>
+              response = UpdateMetadataResponse.readFrom(receive.buffer)
+          }
+          stateChangeLogger.trace("Controller %d epoch %d received response %s 
for a request sent to broker %s"
+            .format(controllerId, controllerContext.epoch, response.toString, 
toBroker.toString))
 
-        if(callback != null) {
-          callback(response)
+          if (callback != null) {
+            callback(response)
+          }
         }
       }
     } catch {

Reply via email to