Repository: kafka Updated Branches: refs/heads/trunk 4cdb96f4c -> 53651937f
HOTFIX: Fix verbose logging in ControllerChannelManager.brokerReady Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #1786 from hachikuji/hotfix-ctrlchannelmgr-verbose-logging Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53651937 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53651937 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53651937 Branch: refs/heads/trunk Commit: 53651937fa8a7750daf44d7e88e616f01e2e37d3 Parents: 4cdb96f Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Aug 26 00:05:22 2016 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Fri Aug 26 00:05:22 2016 +0100 ---------------------------------------------------------------------- .../controller/ControllerChannelManager.scala | 9 +++++---- .../kafka/utils/NetworkClientBlockingOps.scala | 18 ++++++++++++++---- 2 files changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/53651937/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index c46a536..03cd98c 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -226,12 +226,13 @@ class RequestSendThread(val controllerId: Int, private def brokerReady(): Boolean = { import NetworkClientBlockingOps._ try { - val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time) + if (!networkClient.isReady(brokerNode)(time)) { + if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)) + throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") - if (!ready) - throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") + info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString())) + } - info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString())) true } catch { case e: Throwable => http://git-wip-us.apache.org/repos/asf/kafka/blob/53651937/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala index 9aca663..9b0828f 100644 --- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala +++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala @@ -45,6 +45,19 @@ object NetworkClientBlockingOps { class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { /** + * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending + * disconnects have been processed. + * + * This method can be used to check the status of a connection prior to calling `blockingReady` to be able + * to tell whether the latter completed a new connection. + */ + def isReady(node: Node)(implicit time: JTime): Boolean = { + val currentTime = time.milliseconds() + client.poll(0, currentTime) + client.isReady(node, currentTime) + } + + /** * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll` * invocations until the connection to `node` is ready, the timeout expires or the connection fails. * @@ -77,10 +90,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { } } - // poll once to receive pending disconnects - client.poll(0, startTime) - - client.ready(node, startTime) || awaitReady(startTime) + isReady(node) || client.ready(node, startTime) || awaitReady(startTime) } /**