Updated Branches: refs/heads/0.8 e93937c88 -> 32cd8994b
kafka-907; controller needs to close socket channel to brokers on exception ; patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32cd8994 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32cd8994 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32cd8994 Branch: refs/heads/0.8 Commit: 32cd8994bf35b65a8053e0caea9a7710cc889df7 Parents: e93937c Author: Jun Rao <[email protected]> Authored: Wed May 22 16:23:21 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Wed May 22 16:23:21 2013 -0700 ---------------------------------------------------------------------- .../controller/ControllerChannelManager.scala | 6 ++++-- 1 files changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/32cd8994/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 0c41d1d..38b8674 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -122,6 +122,7 @@ class RequestSendThread(val controllerId: Int, try{ lock synchronized { + channel.connect() // establish a socket connection if needed channel.send(request) receive = channel.receive() var response: RequestOrResponse = null @@ -142,8 +143,9 @@ class RequestSendThread(val controllerId: Int, } } catch { case e => - // log it and let it go. Let controller shut it down. - debug("Exception occurs", e) + warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) + // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. + channel.disconnect() } } }
