I often see the following exception while running some tests
(ProducerFailureHandlingTest.testNoResponse is one such instance):
[2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
Controller 0 fails to send a request to broker
id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
java.lang.NullPointerException
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:150)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Looking at that code in question, I can see that the NPE can be trigger
when the "receive" is null which can happen if the "isRunning" is false
(i.e a shutdown has been requested). The fix to prevent this seems
straightforward:
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..10f4c5a 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
Utils.swallow(Thread.sleep(300))
}
}
- var response: RequestOrResponse = null
- request.requestId.get match {
- case RequestKeys.LeaderAndIsrKey =>
- response = LeaderAndIsrResponse.readFrom(receive.buffer)
- case RequestKeys.StopReplicaKey =>
- response = StopReplicaResponse.readFrom(receive.buffer)
- case RequestKeys.UpdateMetadataKey =>
- response = UpdateMetadataResponse.readFrom(receive.buffer)
- }
- stateChangeLogger.trace("Controller %d epoch %d received
response %s for a request sent to broker %s"
- .format(controllerId,
controllerContext.epoch, response.toString, toBroker.toString))
+ if (receive != null) {
+ var response: RequestOrResponse = null
+ request.requestId.get match {
+ case RequestKeys.LeaderAndIsrKey =>
+ response = LeaderAndIsrResponse.readFrom(receive.buffer)
+ case RequestKeys.StopReplicaKey =>
+ response = StopReplicaResponse.readFrom(receive.buffer)
+ case RequestKeys.UpdateMetadataKey =>
+ response = UpdateMetadataResponse.readFrom(receive.buffer)
+ }
+ stateChangeLogger.trace("Controller %d epoch %d received
response %s for a request sent to broker %s"
+ .format(controllerId, controllerContext.epoch,
response.toString, toBroker.toString))
- if(callback != null) {
- callback(response)
+ if (callback != null) {
+ callback(response)
+ }
}
}
However can this really be considered a fix or would this just be hiding
the real issue and would there be something more that will have to be
done in this case? I'm on trunk FWIW.
-Jaikiran