Repository: kafka Updated Branches: refs/heads/trunk cbdcd5f10 -> f20e5108a
kafka-1738; Partitions for topic not created after restart from forced shutdown; patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f20e5108 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f20e5108 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f20e5108 Branch: refs/heads/trunk Commit: f20e5108a271e1fc37ca6752773efa74e2fef67f Parents: cbdcd5f Author: Jun Rao <[email protected]> Authored: Fri Nov 7 10:46:21 2014 -0800 Committer: Jun Rao <[email protected]> Committed: Fri Nov 7 10:46:21 2014 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/controller/ControllerChannelManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f20e5108/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ecbfa0f..eb492f0 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -130,10 +130,11 @@ class RequestSendThread(val controllerId: Int, // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { channel.send(request) + receive = channel.receive() isSendSuccessful = true } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message - error(("Controller %d epoch %d failed to send request %s to broker %s. " + + warn(("Controller %d epoch %d fails to send request %s to broker %s. " + "Reconnecting to broker.").format(controllerId, controllerContext.epoch, request.toString, toBroker.toString()), e) channel.disconnect() @@ -143,7 +144,6 @@ class RequestSendThread(val controllerId: Int, Utils.swallow(Thread.sleep(300)) } } - receive = channel.receive() var response: RequestOrResponse = null request.requestId.get match { case RequestKeys.LeaderAndIsrKey => @@ -162,7 +162,7 @@ class RequestSendThread(val controllerId: Int, } } catch { case e: Throwable => - warn("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) + error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() }
