Repository: kafka Updated Branches: refs/heads/trunk fa6b90f97 -> 9f21837e9
KAFKA-3076; BrokerChangeListener should log the brokers in order Author: Konrad <[email protected]> Author: konradkalita <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #749 from konradkalita/kafka-3076 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f21837e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f21837e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f21837e Branch: refs/heads/trunk Commit: 9f21837e9925cf768c95a29423f9481b50dbe21d Parents: fa6b90f Author: Konrad <[email protected]> Authored: Mon Jan 25 14:45:09 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Mon Jan 25 14:45:09 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/controller/ReplicaStateMachine.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f21837e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 7ebece7..2fd8b95 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -352,7 +352,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { class BrokerChangeListener() extends IZkChildListener with Logging { this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: " def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { - info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(","))) + info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(","))) inLock(controllerContext.controllerLock) { if (hasStarted.get) { ControllerStats.leaderElectionTimer.time { @@ -364,14 +364,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id)) controllerContext.liveBrokers = curBrokers + val newBrokerIdsSorted = newBrokerIds.toSeq.sorted + val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted + val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" - .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(","))) + .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(","))) newBrokers.foreach(controllerContext.controllerChannelManager.addBroker) deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker) if(newBrokerIds.size > 0) - controller.onBrokerStartup(newBrokerIds.toSeq) + controller.onBrokerStartup(newBrokerIdsSorted) if(deadBrokerIds.size > 0) - controller.onBrokerFailure(deadBrokerIds.toSeq) + controller.onBrokerFailure(deadBrokerIdsSorted) } catch { case e: Throwable => error("Error while handling broker changes", e) }
