I often see the following exception while running some tests (ProducerFailureHandlingTest.testNoResponse is one such instance):

[2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread], Controller 0 fails to send a request to broker id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
java.lang.NullPointerException
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:150)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Looking at that code in question, I can see that the NPE can be trigger when the "receive" is null which can happen if the "isRunning" is false (i.e a shutdown has been requested). The fix to prevent this seems straightforward:

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..10f4c5a 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
               Utils.swallow(Thread.sleep(300))
           }
         }
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndIsrResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaKey =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-          case RequestKeys.UpdateMetadataKey =>
-            response = UpdateMetadataResponse.readFrom(receive.buffer)
-        }
- stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" - .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
+        if (receive != null) {
+          var response: RequestOrResponse = null
+          request.requestId.get match {
+            case RequestKeys.LeaderAndIsrKey =>
+              response = LeaderAndIsrResponse.readFrom(receive.buffer)
+            case RequestKeys.StopReplicaKey =>
+              response = StopReplicaResponse.readFrom(receive.buffer)
+            case RequestKeys.UpdateMetadataKey =>
+              response = UpdateMetadataResponse.readFrom(receive.buffer)
+          }
+ stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" + .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))

-        if(callback != null) {
-          callback(response)
+          if (callback != null) {
+            callback(response)
+          }
         }
       }


However can this really be considered a fix or would this just be hiding the real issue and would there be something more that will have to be done in this case? I'm on trunk FWIW.


-Jaikiran

Reply via email to