Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk
Conflicts:
core/src/main/scala/kafka/controller/KafkaController.scala
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f473566
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f473566
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f473566
Branch: refs/heads/trunk
Commit: 3f473566ab2ef51f17034dc02f632df5d38fe307
Parents: 72610d1 26a02c3
Author: Sriram Subramanian <[email protected]>
Authored: Mon Jan 27 13:25:06 2014 -0800
Committer: Sriram Subramanian <[email protected]>
Committed: Mon Jan 27 13:25:06 2014 -0800
----------------------------------------------------------------------
bin/zookeeper-shell.sh | 6 +-
.../main/scala/kafka/admin/TopicCommand.scala | 10 +-
.../kafka/api/ControlledShutdownRequest.scala | 16 +-
.../kafka/api/ControlledShutdownResponse.scala | 3 +
.../src/main/scala/kafka/api/FetchRequest.scala | 26 ++-
.../scala/kafka/api/LeaderAndIsrRequest.scala | 23 +-
.../scala/kafka/api/LeaderAndIsrResponse.scala | 3 +
.../scala/kafka/api/OffsetCommitRequest.scala | 16 ++
.../scala/kafka/api/OffsetCommitResponse.scala | 3 +
.../scala/kafka/api/OffsetFetchRequest.scala | 18 +-
.../scala/kafka/api/OffsetFetchResponse.scala | 4 +-
.../main/scala/kafka/api/OffsetRequest.scala | 23 +-
.../main/scala/kafka/api/OffsetResponse.scala | 1 +
.../main/scala/kafka/api/ProducerRequest.scala | 24 ++-
.../main/scala/kafka/api/ProducerResponse.scala | 2 +
.../scala/kafka/api/RequestOrResponse.scala | 8 +-
.../scala/kafka/api/StopReplicaRequest.scala | 25 ++-
.../scala/kafka/api/StopReplicaResponse.scala | 2 +
.../scala/kafka/api/TopicMetadataRequest.scala | 21 +-
.../scala/kafka/api/TopicMetadataResponse.scala | 2 +
.../scala/kafka/api/UpdateMetadataRequest.scala | 19 +-
.../kafka/api/UpdateMetadataResponse.scala | 2 +
.../main/scala/kafka/cluster/Partition.scala | 19 +-
.../common/LogCleaningAbortedException.scala | 24 +++
.../common/OptimisticLockFailureException.scala | 23 --
.../kafka/common/ThreadShutdownException.scala | 24 +++
.../controller/ControllerChannelManager.scala | 53 +++--
.../kafka/controller/KafkaController.scala | 198 +++++++++++-------
.../controller/PartitionLeaderSelector.scala | 40 ++--
.../controller/PartitionStateMachine.scala | 22 +-
.../kafka/controller/ReplicaStateMachine.scala | 35 ++--
.../kafka/javaapi/TopicMetadataRequest.scala | 25 +++
core/src/main/scala/kafka/log/Log.scala | 37 ++--
core/src/main/scala/kafka/log/LogCleaner.scala | 208 +++++++------------
.../scala/kafka/log/LogCleanerManager.scala | 188 +++++++++++++++++
core/src/main/scala/kafka/log/LogManager.scala | 76 ++++---
.../scala/kafka/network/RequestChannel.scala | 6 +-
.../scala/kafka/producer/SyncProducer.scala | 5 -
.../kafka/server/AbstractFetcherThread.scala | 10 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 5 +-
.../kafka/server/KafkaRequestHandler.scala | 1 +
.../kafka/server/ReplicaFetcherThread.scala | 7 +-
.../scala/kafka/server/ReplicaManager.scala | 18 +-
.../kafka/server/ZookeeperLeaderElector.scala | 36 ++--
core/src/main/scala/kafka/utils/ZkUtils.scala | 43 ----
.../test/scala/unit/kafka/admin/AdminTest.scala | 15 +-
.../test/scala/unit/kafka/log/CleanerTest.scala | 39 ++--
examples/README | 6 +-
system_test/utils/kafka_system_test_utils.py | 6 +-
49 files changed, 913 insertions(+), 513 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3f473566/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index ca2f09b,a0267ae..f4f00b2
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -488,7 -546,7 +546,7 @@@ class KafkaController(val config : Kafk
}
}
-- def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
++ def onPreferredReplicaElection(partitions: Set[TopicAndPartition],
isTriggeredByAutoRebalance: Boolean = true) {
info("Starting preferred replica leader election for partitions
%s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++=
partitions
@@@ -496,7 -554,7 +554,7 @@@
} catch {
case e: Throwable => error("Error completing preferred replica leader
election for partitions %s".format(partitions.mkString(",")), e)
} finally {
-- removePartitionsFromPreferredReplicaElection(partitions)
++ removePartitionsFromPreferredReplicaElection(partitions,
isTriggeredByAutoRebalance)
}
}
@@@ -765,7 -823,7 +823,8 @@@
}
}
-- def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved:
Set[TopicAndPartition]) {
++ def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved:
Set[TopicAndPartition],
++ isTriggeredByAutoRebalance
: Boolean) {
for(partition <- partitionsToBeRemoved) {
// check the status
val currentLeader =
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@@ -776,7 -834,7 +835,8 @@@
warn("Partition %s failed to complete preferred replica leader
election. Leader is %d".format(partition, currentLeader))
}
}
-- ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
++ if (!isTriggeredByAutoRebalance)
++ ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
controllerContext.partitionsUndergoingPreferredReplicaElection --=
partitionsToBeRemoved
}
@@@ -966,26 -1018,26 +1020,16 @@@
// check ratio and if greater than desired ratio, trigger a
rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio >
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-- controllerContext.controllerLock synchronized {
-- // do this check only if the broker is live and there are no
partitions being reassigned currently
-- // and preferred replica election is not in progress
-- if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-- controllerContext.partitionsBeingReassigned.size == 0 &&
--
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
-- val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-- val partitionsList = topicsNotInPreferredReplica.keys.map(e
=> Map("topic" -> e.topic, "partition" -> e.partition))
-- val jsonData = Json.encode(Map("version" -> 1, "partitions"
-> partitionsList))
-- try {
-- ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
-- info("Created preferred replica election path with
%s".format(jsonData))
-- } catch {
-- case e2: ZkNodeExistsException =>
-- val partitionsUndergoingPreferredReplicaElection =
--
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
zkPath)._1)
-- error("Preferred replica leader election currently in
progress for " +
-- "%s. Aborting
operation".format(partitionsUndergoingPreferredReplicaElection));
-- case e3: Throwable =>
-- error("Error while trying to auto rebalance topics
%s".format(topicsNotInPreferredReplica.keys))
++ topicsNotInPreferredReplica.foreach {
++ case(topicPartition, replicas) => {
++ controllerContext.controllerLock synchronized {
++ // do this check only if the broker is live and there are
no partitions being reassigned currently
++ // and preferred replica election is not in progress
++ if (controllerContext.liveBrokerIds.contains(leaderBroker)
&&
++ controllerContext.partitionsBeingReassigned.size == 0 &&
++
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
++ onPreferredReplicaElection(Set(topicPartition), false)
++ }
}
}
}