Repository: kafka
Updated Branches:
  refs/heads/0.10.1 f20aaaa66 -> 18e05117a


KAFKA-4216; Control Leader & Follower Throttled Replicas Separately

Splits the throttled replica configuration (the list of which replicas should 
be throttled for each topic) into two. One for the leader throttle, one for the 
follower throttle.

So:
 quota.replication.throttled.replicas
=>
quota.leader.replication.throttled.replicas & 
quota.follower.replication.throttled.replicas

Author: Ben Stopford <benstopf...@gmail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #1906 from 
benstopford/KAFKA-4216-seperate-leader-and-follower-throttled-replica-lists

(cherry picked from commit 2ca9177f499a07262db0072bbbb252d3ca2dfb58)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/18e05117
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/18e05117
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/18e05117

Branch: refs/heads/0.10.1
Commit: 18e05117a320fc2a2c6e63a2da29d565c72fb173
Parents: f20aaaa
Author: Ben Stopford <benstopf...@gmail.com>
Authored: Thu Sep 29 07:55:05 2016 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Thu Sep 29 08:57:38 2016 +0100

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala | 104 +++++++++------
 core/src/main/scala/kafka/log/LogConfig.scala   |  25 ++--
 .../main/scala/kafka/server/ConfigHandler.scala |  52 ++++----
 core/src/main/scala/kafka/utils/CoreUtils.scala |  12 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  53 +++++---
 .../admin/ReassignPartitionsClusterTest.scala   |  30 ++++-
 .../admin/ReassignPartitionsCommandTest.scala   | 128 +++++++++++++------
 .../kafka/admin/ReplicationQuotaUtils.scala     |  11 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  13 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  28 +++-
 .../unit/kafka/server/DynamicConfigTest.scala   |  10 +-
 .../kafka/server/ReplicationQuotasTest.scala    |  14 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   7 -
 13 files changed, 315 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2fa75f6..dccc37c 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -16,14 +16,15 @@
  */
 package kafka.admin
 
-import java.util.Properties
 import joptsimple.OptionParser
-import kafka.log.LogConfig
 import kafka.server.{DynamicConfig, ConfigType}
 import kafka.utils._
 import scala.collection._
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+import kafka.common.{AdminCommandFailedException, TopicAndPartition}
+import kafka.log.LogConfig
+import kafka.log.LogConfig._
+import kafka.utils.CoreUtils._
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
 
@@ -74,7 +75,7 @@ object ReassignPartitionsCommand extends Logging {
     removeThrottle(zkUtils, partitionsToBeReassigned, 
reassignedPartitionsStatus)
   }
 
-  private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: 
Map[TopicAndPartition, scala.Seq[Int]], reassignedPartitionsStatus: 
Map[TopicAndPartition, ReassignmentStatus]): Unit = {
+  private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: 
Map[TopicAndPartition, Seq[Int]], reassignedPartitionsStatus: 
Map[TopicAndPartition, ReassignmentStatus]): Unit = {
     var changed = false
 
     //If all partitions have completed remove the throttle
@@ -92,7 +93,8 @@ object ReassignPartitionsCommand extends Logging {
       val topics = partitionsToBeReassigned.keySet.map(tp => 
tp.topic).toSeq.distinct
       for (topic <- topics) {
         val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, 
topic)
-        if (configs.remove(LogConfig.ThrottledReplicasListProp) != null) {
+        if (configs.remove(LogConfig.LeaderThrottledReplicasListProp) != null
+          || configs.remove(LogConfig.FollowerThrottledReplicasListProp) != 
null){
           AdminUtils.changeTopicConfig(zkUtils, topic, configs)
           changed = true
         }
@@ -145,7 +147,7 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, 
throttle: Long = -1) {
-    val partitionsToBeReassigned = parseAndValidate(reassignmentJsonString)
+    val partitionsToBeReassigned = parseAndValidate(zkUtils, 
reassignmentJsonString)
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, 
partitionsToBeReassigned.toMap)
 
     // If there is an existing rebalance running, attempt to change its 
throttle
@@ -164,15 +166,16 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def printCurrentAssignment(zkUtils: ZkUtils, partitionsToBeReassigned: 
scala.Seq[(TopicAndPartition, scala.Seq[Int])]): Unit = {
+  def printCurrentAssignment(zkUtils: ZkUtils, partitionsToBeReassigned: 
Seq[(TopicAndPartition, Seq[Int])]): Unit = {
     // before starting assignment, output the current replica assignment to 
facilitate rollback
     val currentPartitionReplicaAssignment = 
zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic))
     println("Current partition replica assignment\n\n%s\n\nSave this to use as 
the --reassignment-json-file option during rollback"
       
.format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment)))
   }
 
-  def parseAndValidate(reassignmentJsonString: String): 
scala.Seq[(TopicAndPartition, scala.Seq[Int])] = {
+  def parseAndValidate(zkUtils: ZkUtils, reassignmentJsonString: String): 
Seq[(TopicAndPartition, Seq[Int])] = {
     val partitionsToBeReassigned = 
ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
+    
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file 
is empty")
     val duplicateReassignedPartitions = 
CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp })
@@ -180,13 +183,21 @@ object ReassignPartitionsCommand extends Logging {
       throw new AdminCommandFailedException("Partition reassignment contains 
duplicate topic partitions: 
%s".format(duplicateReassignedPartitions.mkString(",")))
     val duplicateEntries = partitionsToBeReassigned
       .map { case (tp, replicas) => (tp, CoreUtils.duplicates(replicas))}
-      .filter { case (tp, duplicatedReplicas) => duplicatedReplicas.nonEmpty }
+      .filter { case (_, duplicatedReplicas) => duplicatedReplicas.nonEmpty }
     if (duplicateEntries.nonEmpty) {
       val duplicatesMsg = duplicateEntries
         .map { case (tp, duplicateReplicas) => "%s contains multiple entries 
for %s".format(tp, duplicateReplicas.mkString(",")) }
         .mkString(". ")
       throw new AdminCommandFailedException("Partition replica lists may not 
contain duplicate entries: %s".format(duplicatesMsg))
     }
+    //Check that all partitions in the proposed assignment exist in the cluster
+    val proposedTopics = partitionsToBeReassigned.map { case (tp, _) => 
tp.topic }.distinct
+    val existingAssignment = 
zkUtils.getReplicaAssignmentForTopics(proposedTopics)
+    val nonExistentPartitions = partitionsToBeReassigned.map { case (tp, _) => 
tp }.filterNot(existingAssignment.contains)
+    if (nonExistentPartitions.nonEmpty)
+      throw new AdminCommandFailedException("The proposed assignment contains 
non-existent partitions: " +
+        nonExistentPartitions)
+
     partitionsToBeReassigned
   }
 
@@ -289,20 +300,25 @@ object ReassignPartitionsCommand extends Logging {
   }
 }
 
-class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: 
Map[TopicAndPartition, Seq[Int]])
+class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: 
Map[TopicAndPartition, Seq[Int]])
   extends Logging {
 
+  def existingAssignment(): Map[TopicAndPartition, Seq[Int]] = {
+    val proposedTopics = proposedAssignment.keySet.map(_.topic).toSeq
+    zkUtils.getReplicaAssignmentForTopics(proposedTopics)
+  }
+
   private def maybeThrottle(throttle: Long): Unit = {
     if (throttle >= 0) {
       maybeLimit(throttle)
-      addThrottledReplicaList()
+      assignThrottledReplicas(existingAssignment(), proposedAssignment)
     }
   }
 
   def maybeLimit(throttle: Long) {
     if (throttle >= 0) {
-      val existingBrokers = 
zkUtils.getReplicaAssignmentForTopics(partitions.map(_._1.topic).toSeq).flatMap(_._2).toSeq
-      val proposedBrokers = partitions.flatMap(_._2).toSeq
+      val existingBrokers = existingAssignment().values.flatten.toSeq
+      val proposedBrokers = proposedAssignment.values.flatten.toSeq
       val brokers = (existingBrokers ++ proposedBrokers).distinct
 
       for (id <- brokers) {
@@ -310,41 +326,57 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, 
partitions: Map[TopicAndPartit
         configs.put(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, 
throttle.toString)
         AdminUtils.changeBrokerConfig(zkUtils, Seq(id), configs)
       }
-      println(f"The throttle limit was set to $throttle%,d B/s")
+      println(s"The throttle limit was set to $throttle B/s")
     }
   }
 
-  def addThrottledReplicaList(): Unit = {
-    //apply the throttle to all move destinations and all move sources
-    val existing = 
zkUtils.getReplicaAssignmentForTopics(partitions.map(_._1.topic).toSeq)
-    val moves = replicaMoves(existing, proposed = partitions)
-    for (topic <- partitions.keySet.map(tp => tp.topic).toSeq.distinct)
-      AdminUtils.changeTopicConfig(zkUtils, topic, new Properties {
-        put(LogConfig.ThrottledReplicasListProp, moves.get(topic).get )
-      })
-    println(s"Throttles were added to the following replicas: $moves")
+  private[admin] def assignThrottledReplicas(allExisting: 
Map[TopicAndPartition, Seq[Int]], allProposed: Map[TopicAndPartition, 
Seq[Int]], admin: AdminUtilities = AdminUtils): Unit = {
+    for (topic <- allProposed.keySet.map(_.topic).toSeq) {
+      val (existing, proposed) = filterBy(topic, allExisting, allProposed)
+
+      //Apply the leader throttle to all replicas that exist before the 
re-balance.
+      val leader = format(preRebalanceReplicaForMovingPartitions(existing, 
proposed))
+
+      //Apply a follower throttle to all "move destinations".
+      val follower = format(postRebalanceReplicasThatMoved(existing, proposed))
+
+      admin.changeTopicConfig(zkUtils, topic, propsWith(
+        (LeaderThrottledReplicasListProp, leader),
+        (FollowerThrottledReplicasListProp, follower)))
+
+      debug(s"Updated leader-throttled replicas for topic $topic with: 
$leader")
+      debug(s"Updated follower-throttled replicas for topic $topic with: 
$follower")
+    }
   }
 
-  def replicaMoves(existing: Map[TopicAndPartition, Seq[Int]], proposed: 
Map[TopicAndPartition, Seq[Int]]): Map[String, String] = {
-    //Find the replicas that have moved
-    val movesByPartition = existing.map { case (topicAndPartition, 
existingReplicas) =>
-      val before = existingReplicas.toSet
-      val after = proposed.get(topicAndPartition).get.toSet
-      val moving = (after.filterNot(before) ++ 
before.filterNot(after)).toSeq.distinct.sorted
-      val formatted = moving.map { brokerId => 
s"${topicAndPartition.partition}:$brokerId" }
-      (topicAndPartition, formatted)
+  private def postRebalanceReplicasThatMoved(existing: Map[TopicAndPartition, 
Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): Map[TopicAndPartition, 
Seq[Int]] = {
+    //For each partition in the proposed list, filter out any replicas that 
exist now (i.e. not moving)
+    existing.map { case (tp, current) =>
+      tp -> (proposed(tp).toSet -- current).toSeq
     }
-    //Group by topic
-    val movesByTopic = movesByPartition.groupBy(_._1.topic)
-      .map { case (topic, reps) => (topic, reps.values.flatMap(rep => rep)) }
+  }
+
+  private def preRebalanceReplicaForMovingPartitions(existing: 
Map[TopicAndPartition, Seq[Int]], proposed: Map[TopicAndPartition, Seq[Int]]): 
Map[TopicAndPartition, Seq[Int]] = {
+    //Throttle all existing replicas (as any one might be a leader). So just 
filter out those which aren't moving
+    existing.filter { case (tp, current) =>
+      (proposed(tp).toSet -- current).nonEmpty
+    }
+  }
+
+  def format(moves: Map[TopicAndPartition, Seq[Int]]): String =
+    moves.flatMap { case (tp, moves) =>
+      moves.map(replicaId => s"${tp.partition}:${replicaId}")
+    }.mkString(",")
 
-    movesByTopic.map { case (topic, moves) => topic -> moves.mkString(",") }
+  def filterBy(topic: String, allExisting: Map[TopicAndPartition, Seq[Int]], 
allProposed: Map[TopicAndPartition, Seq[Int]]): (Map[TopicAndPartition, 
Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
+    (allExisting.filter { case (tp, _) => tp.topic == topic },
+      allProposed.filter { case (tp, _) => tp.topic == topic })
   }
 
   def reassignPartitions(throttle: Long = -1): Boolean = {
     maybeThrottle(throttle)
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkUtils, 
p._1.topic, p._1.partition))
+      val validPartitions = proposedAssignment.filter { case (p, _) => 
validatePartition(zkUtils, p.topic, p.partition) }
       if (validPartitions.isEmpty) false
       else {
         val jsonReassignmentData = 
ZkUtils.formatAsReassignmentJson(validPartitions)

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index f01166d..0a447d4 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -17,20 +17,19 @@
 
 package kafka.log
 
-import java.util.Properties
+import java.util.{Collections, Locale, Properties}
 
 import scala.collection.JavaConverters._
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
-import kafka.server.{ThrottledReplicaValidator, KafkaConfig}
+import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
-import java.util.Locale
 
 import scala.collection.mutable
-import org.apache.kafka.common.config.ConfigDef.{ConfigKey, 
ValidList,Validator}
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, 
Validator}
 
 object Defaults {
   val SegmentSize = kafka.server.Defaults.LogSegmentBytes
@@ -55,7 +54,8 @@ object Defaults {
   val MessageFormatVersion = kafka.server.Defaults.LogMessageFormatVersion
   val MessageTimestampType = kafka.server.Defaults.LogMessageTimestampType
   val MessageTimestampDifferenceMaxMs = 
kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
-  val ThrottledReplicasList = ""
+  val LeaderThrottledReplicasList = Collections.emptyList[String]()
+  val FollowerThrottledReplicasList = Collections.emptyList[String]()
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfig.configDef, props, false) {
@@ -86,7 +86,8 @@ case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfi
   val messageFormatVersion = 
ApiVersion(getString(LogConfig.MessageFormatVersionProp))
   val messageTimestampType = 
TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
   val messageTimestampDifferenceMaxMs = 
getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
-  val throttledReplicasList = getString(LogConfig.ThrottledReplicasListProp)
+  val leaderThrottledReplicasList = 
getList(LogConfig.LeaderThrottledReplicasListProp)
+  val followerThrottledReplicasList = 
getList(LogConfig.FollowerThrottledReplicasListProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % 
math.min(segmentJitterMs, segmentMs)
@@ -123,7 +124,8 @@ object LogConfig {
   val MessageFormatVersionProp = "message.format.version"
   val MessageTimestampTypeProp = "message.timestamp.type"
   val MessageTimestampDifferenceMaxMsProp = 
"message.timestamp.difference.max.ms"
-  val ThrottledReplicasListProp = "quota.replication.throttled.replicas"
+  val LeaderThrottledReplicasListProp = 
"quota.leader.replication.throttled.replicas"
+  val FollowerThrottledReplicasListProp = 
"quota.follower.replication.throttled.replicas"
 
   val SegmentSizeDoc = "This configuration controls the segment file size for 
" +
     "the log. Retention and cleaning is always done a file at a time so a 
larger " +
@@ -194,7 +196,9 @@ object LogConfig {
   val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed 
between the timestamp when a broker receives " +
     "a message and the timestamp specified in the message. If 
message.timestamp.type=CreateTime, a message will be rejected " +
     "if the difference in timestamp exceeds this threshold. This configuration 
is ignored if message.timestamp.type=LogAppendTime."
-  val ThrottledReplicasListDoc = "A list of replicas for which log replication 
should be throttled. The list should describe a set of " +
+  val LeaderThrottledReplicasListDoc = "A list of replicas for which log 
replication should be throttled on the leader. The list should describe a set 
of " +
+    "replicas in the form 
[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:..."
+  val FollowerThrottledReplicasListDoc = "A list of replicas for which log 
replication should be throttled on the follower. The list should describe a set 
of " +
     "replicas in the form 
[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:..."
 
   private class LogConfigDef extends ConfigDef {
@@ -285,7 +289,10 @@ object LogConfig {
         KafkaConfig.LogMessageTimestampTypeProp)
       .define(MessageTimestampDifferenceMaxMsProp, LONG, 
Defaults.MessageTimestampDifferenceMaxMs,
         atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc, 
KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
-      .define(ThrottledReplicasListProp, STRING, 
Defaults.ThrottledReplicasList, ThrottledReplicaValidator, MEDIUM, 
ThrottledReplicasListDoc, ThrottledReplicasListProp)
+      .define(LeaderThrottledReplicasListProp, LIST, 
Defaults.LeaderThrottledReplicasList, ThrottledReplicaListValidator, MEDIUM,
+        LeaderThrottledReplicasListDoc, LeaderThrottledReplicasListProp)
+      .define(FollowerThrottledReplicasListProp, LIST, 
Defaults.FollowerThrottledReplicasList, ThrottledReplicaListValidator, MEDIUM,
+        FollowerThrottledReplicasListDoc, FollowerThrottledReplicasListProp)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index c3a07aa..c6868c7 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -66,24 +66,24 @@ class TopicConfigHandler(private val logManager: 
LogManager, kafkaConfig: KafkaC
       logs.foreach(_.config = logConfig)
     }
 
-    val brokerId = kafkaConfig.brokerId
-
-    if (topicConfig.containsKey(LogConfig.ThrottledReplicasListProp) && 
topicConfig.getProperty(LogConfig.ThrottledReplicasListProp).length > 0) {
-      val partitions = parseThrottledPartitions(topicConfig, brokerId)
-      quotas.leader.markThrottled(topic, partitions)
-      quotas.follower.markThrottled(topic, partitions)
-      logger.debug(s"Setting throttled partitions on broker $brokerId for 
topic: $topic and partitions $partitions")
-    } else {
-      quotas.leader.removeThrottle(topic)
-      quotas.follower.removeThrottle(topic)
-      logger.debug(s"Removing throttled partitions from broker $brokerId for 
topic $topic")
+    def updateThrottledList(prop: String, quotaManager: 
ReplicationQuotaManager) = {
+      if (topicConfig.containsKey(prop) && 
topicConfig.getProperty(prop).length > 0) {
+        val partitions = parseThrottledPartitions(topicConfig, 
kafkaConfig.brokerId, prop)
+        quotaManager.markThrottled(topic, partitions)
+        logger.debug(s"Setting $prop on broker ${kafkaConfig.brokerId} for 
topic: $topic and partitions $partitions")
+      } else {
+        quotaManager.removeThrottle(topic)
+        logger.debug(s"Removing $prop from broker ${kafkaConfig.brokerId} for 
topic $topic")
+      }
     }
+    updateThrottledList(LogConfig.LeaderThrottledReplicasListProp, 
quotas.leader)
+    updateThrottledList(LogConfig.FollowerThrottledReplicasListProp, 
quotas.follower)
   }
 
-  def parseThrottledPartitions(topicConfig: Properties, brokerId: Int): 
Seq[Int] = {
-    val configValue = 
topicConfig.get(LogConfig.ThrottledReplicasListProp).toString.trim
-    ThrottledReplicaValidator.ensureValid(LogConfig.ThrottledReplicasListProp, 
configValue)
-    configValue.trim match {
+  def parseThrottledPartitions(topicConfig: Properties, brokerId: Int, prop: 
String): Seq[Int] = {
+    val configValue = topicConfig.get(prop).toString.trim
+    ThrottledReplicaListValidator.ensureValidString(prop, configValue)
+    configValue match {
       case "" => Seq()
       case "*" => AllReplicas
       case _ => configValue.trim
@@ -162,16 +162,20 @@ class BrokerConfigHandler(private val brokerConfig: 
KafkaConfig, private val quo
   }
 }
 
-object ThrottledReplicaValidator extends Validator {
-  override def ensureValid(name: String, value: scala.Any): Unit = {
-    value match {
-      case s: String => if (!isValid(s))
+object ThrottledReplicaListValidator extends Validator {
+  def ensureValidString(name: String, value: String): Unit =
+    ensureValid(name, value.split(",").map(_.trim).toSeq)
+
+  override def ensureValid(name: String, value: Any): Unit = {
+    def check(proposed: Seq[Any]): Unit = {
+      if (!(proposed.forall(_.toString.trim.matches("([0-9]+:[0-9]+)?"))
+        || proposed.headOption.exists(_.toString.trim.equals("*"))))
         throw new ConfigException(name, value, s"$name  must match for format 
[partitionId],[brokerId]:[partitionId],[brokerId]:[partitionId],[brokerId] etc")
-      case _ => throw new ConfigException(name, value, s"$name  must be a 
string")
     }
-  }
-
-  private def isValid(proposed: String): Boolean = {
-    proposed.trim.equals("*") || 
proposed.trim.matches("([0-9]+:[0-9]+)?(,[0-9]+:[0-9]+)*")
+    value match {
+      case scalaSeq: Seq[_] => check(scalaSeq)
+      case javaList: java.util.List[_] => check(javaList.asScala)
+      case _ => throw new ConfigException(name, value, s"$name  must be a List 
but was ${value.getClass.getName}")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 4edf5ed..1c059bb 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -22,7 +22,7 @@ import java.nio._
 import java.nio.channels._
 import java.util.concurrent.locks.{Lock, ReadWriteLock}
 import java.lang.management._
-import java.util.UUID
+import java.util.{Properties, UUID}
 import javax.management._
 import javax.xml.bind.DatatypeConverter
 
@@ -301,4 +301,14 @@ object CoreUtils extends Logging {
     // Remove the "==" padding at the end.
     urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2)
   }
+
+  def propsWith(key: String, value: String): Properties = {
+    propsWith((key, value))
+  }
+
+  def propsWith(props: (String, String)*): Properties = {
+    val properties = new Properties()
+    props.foreach { case (k, v) => properties.put(k, v) }
+    properties
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 4452da3..609e9a2 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -31,9 +31,12 @@ import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.common.TopicAndPartition
 import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
+import java.util
 import kafka.utils.TestUtils._
 import kafka.admin.AdminUtils._
 import scala.collection.{Map, immutable}
+import kafka.utils.CoreUtils._
+import scala.collection.JavaConverters._
 
 class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
@@ -385,23 +388,32 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
     val topic = "my-topic"
     val server = 
TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
zkConnect)))
 
-    def makeConfig(messageSize: Int, retentionMs: Long, throttledReplicas: 
String) = {
+    def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: 
String, throttledFollowers: String) = {
       val props = new Properties()
       props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
       props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
-      props.setProperty(LogConfig.ThrottledReplicasListProp, throttledReplicas)
+      props.setProperty(LogConfig.LeaderThrottledReplicasListProp, 
throttledLeaders)
+      props.setProperty(LogConfig.FollowerThrottledReplicasListProp, 
throttledFollowers)
       props
     }
 
-    def checkConfig(messageSize: Int, retentionMs: Long, throttledReplicas: 
String, quotaManagerIsThrottled: Boolean) {
+    def checkConfig(messageSize: Int, retentionMs: Long, throttledLeaders: 
String, throttledFollowers: String, quotaManagerIsThrottled: Boolean) {
+      def checkList(actual: util.List[String], expected: String): Unit = {
+        assertNotNull(actual)
+        if (expected == "")
+          assertTrue(actual.isEmpty)
+        else
+          assertEquals(expected.split(",").toSeq, actual.asScala)
+      }
       TestUtils.retry(10000) {
         for(part <- 0 until partitions) {
           val log = server.logManager.getLog(TopicAndPartition(topic, part))
           assertTrue(log.isDefined)
           assertEquals(retentionMs, log.get.config.retentionMs)
           assertEquals(messageSize, log.get.config.maxMessageSize)
-          assertEquals(throttledReplicas, log.get.config.throttledReplicasList)
-          assertEquals(quotaManagerIsThrottled, 
server.quotaManagers.leader.isThrottled(new TopicAndPartition(topic, part)))
+          checkList(log.get.config.leaderThrottledReplicasList, 
throttledLeaders)
+          checkList(log.get.config.followerThrottledReplicasList, 
throttledFollowers)
+          assertEquals(quotaManagerIsThrottled, 
server.quotaManagers.leader.isThrottled(TopicAndPartition(topic, part)))
         }
       }
     }
@@ -410,15 +422,20 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
       // create a topic with a few config overrides and check that they are 
applied
       val maxMessageSize = 1024
       val retentionMs = 1000 * 1000
-      AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0"))
+      AdminUtils.createTopic(server.zkUtils, topic, partitions, 1, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+
+      //Standard topic configs will be propagated at topic creation time, but 
the quota manager will not have been updated.
+      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", 
false)
+
+      //Update dynamically and all properties should be applied
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
 
-      //TODO - uncommenting this line reveals a bug. The quota manager is not 
updated when properties are added on topic creation.
-      //      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", true)
+      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", 
true)
 
       // now double the config values for the topic and check that it is 
applied
-      val newConfig: Properties = makeConfig(2 * maxMessageSize, 2 * 
retentionMs, "*")
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * 
maxMessageSize, 2 * retentionMs, "*"))
-      checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", 
quotaManagerIsThrottled = true)
+      val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, makeConfig(2 * 
maxMessageSize, 2 * retentionMs, "*", "*"))
+      checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", 
quotaManagerIsThrottled = true)
 
       // Verify that the same config can be read from ZK
       val configInZk = AdminUtils.fetchEntityConfig(server.zkUtils, 
ConfigType.Topic, topic)
@@ -426,15 +443,15 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
 
       //Now delete the config
       AdminUtils.changeTopicConfig(server.zkUtils, topic, new Properties)
-      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, 
Defaults.ThrottledReplicasList,  quotaManagerIsThrottled = false)
+      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", 
quotaManagerIsThrottled = false)
 
       //Add config back
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0"))
-      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", 
quotaManagerIsThrottled = true)
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, 
makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+      checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", 
quotaManagerIsThrottled = true)
 
       //Now ensure updating to "" removes the throttled replica list also
-      AdminUtils.changeTopicConfig(server.zkUtils, topic, new 
Properties(){put(LogConfig.ThrottledReplicasListProp, "")})
-      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, 
Defaults.ThrottledReplicasList,  quotaManagerIsThrottled = false)
+      AdminUtils.changeTopicConfig(server.zkUtils, topic, 
propsWith((LogConfig.FollowerThrottledReplicasListProp, ""), 
(LogConfig.LeaderThrottledReplicasListProp, "")))
+      checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "",  
quotaManagerIsThrottled = false)
 
     } finally {
       server.shutdown()
@@ -460,12 +477,12 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
       val limit: Long = 1000000
 
       // Set the limit & check it is applied to the log
-      changeBrokerConfig(servers(0).zkUtils, brokerIds,  
wrapInProps(ThrottledReplicationRateLimitProp, limit.toString))
+      changeBrokerConfig(servers(0).zkUtils, brokerIds,  
propsWith(ThrottledReplicationRateLimitProp, limit.toString))
       checkConfig(limit)
 
       // Now double the config values for the topic and check that it is 
applied
       val newLimit = 2 * limit
-      changeBrokerConfig(servers(0).zkUtils, brokerIds,  
wrapInProps(ThrottledReplicationRateLimitProp, newLimit.toString))
+      changeBrokerConfig(servers(0).zkUtils, brokerIds,  
propsWith(ThrottledReplicationRateLimitProp, newLimit.toString))
       checkConfig(newLimit)
 
       // Verify that the same config can be read from ZK

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index b96240c..6d29211 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,7 +12,7 @@
   */
 package kafka.admin
 
-import kafka.common.TopicAndPartition
+import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
@@ -127,7 +127,7 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     ReassignPartitionsCommand.executeAssignment(zkUtils, 
ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
 
     //Check throttle config. Should be throttling replica 0 on 100 and 102 
only.
-    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, 
"0:100,0:102")
+    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, 
"0:100,0:101", "0:102")
 
     //Await completion
     waitForReassignmentToComplete()
@@ -177,8 +177,14 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     ReassignPartitionsCommand.executeAssignment(zkUtils, 
ZkUtils.formatAsReassignmentJson(newAssignment), throttle)
 
     //Check throttle config. Should be throttling specific replicas for each 
topic.
-    checkThrottleConfigAddedToZK(throttle, servers, "topic1", 
"1:101,1:102,0:101,0:102")
-    checkThrottleConfigAddedToZK(throttle, servers, "topic2", 
"1:103,1:104,0:103,0:104")
+    checkThrottleConfigAddedToZK(throttle, servers, "topic1",
+      "1:100,1:101,0:100,0:101", //All replicas for moving partitions should 
be leader-throttled
+      "1:102,0:102" //Move destinations should be follower throttled.
+    )
+    checkThrottleConfigAddedToZK(throttle, servers, "topic2",
+      "1:104,1:105,0:104,0:105", //All replicas for moving partitions should 
be leader-throttled
+      "1:103,0:103" //Move destinations should be follower throttled.
+    )
   }
 
   @Test
@@ -200,13 +206,13 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     ReassignPartitionsCommand.executeAssignment(zkUtils, 
ZkUtils.formatAsReassignmentJson(newAssignment), initialThrottle)
 
     //Check throttle config
-    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, 
"0:100,0:102")
+    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, 
"0:100,0:101", "0:102")
 
     //Ensure that running Verify, whilst the command is executing, should have 
no effect
     ReassignPartitionsCommand.verifyAssignment(zkUtils, 
ZkUtils.formatAsReassignmentJson(newAssignment))
 
     //Check throttle config again
-    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, 
"0:100,0:102")
+    checkThrottleConfigAddedToZK(initialThrottle, servers, topicName, 
"0:100,0:101", "0:102")
 
     //Now re-run the same assignment with a larger throttle, which should only 
act to increase the throttle and make progress
     val newThrottle = initialThrottle * 1000
@@ -214,7 +220,7 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     ReassignPartitionsCommand.executeAssignment(zkUtils, 
ZkUtils.formatAsReassignmentJson(newAssignment), newThrottle)
 
     //Check throttle was changed
-    checkThrottleConfigAddedToZK(newThrottle, servers, topicName, 
"0:100,0:102")
+    checkThrottleConfigAddedToZK(newThrottle, servers, topicName, 
"0:100,0:101", "0:102")
 
     //Await completion
     waitForReassignmentToComplete()
@@ -230,6 +236,16 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
     assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(101, 102))
   }
 
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedDoesNotMatchExisting() {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    //When we execute an assignment that includes an invalid partition (1:101 
in this case)
+    ReassignPartitionsCommand.executeAssignment(zkUtils, 
s"""{"version":1,"partitions":[{"topic":"$topicName","partition":1,"replicas":[101]}]}""")
+  }
+
   def waitForReassignmentToComplete() {
     waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode 
${ZkUtils.ReassignPartitionsPath} wasn't deleted")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 91d5e20..f66dbed 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -16,8 +16,11 @@
  */
 package kafka.admin
 
+import java.util.Properties
+
 import kafka.common.TopicAndPartition
-import kafka.utils.{Logging, TestUtils}
+import kafka.log.LogConfig._
+import kafka.utils.{Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Test
 import org.junit.Assert.assertEquals
@@ -52,48 +55,71 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging wi
 
   @Test
   def shouldFindMovingReplicas() {
+    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
     val assigner = new ReassignPartitionsCommand(null, null)
 
-    //Given partition 0 moves from broker 100 -> 102
-    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101))
-    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(101, 102))
+    //Given partition 0 moves from broker 100 -> 102. Partition 1 does not 
move.
+    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), 
control)
+    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), 
control)
+
 
-      //When
-    val moves = assigner.replicaMoves(existing, proposed)
+    val mock = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, 
configChange: Properties): Unit = {
+        assertEquals("0:102", 
configChange.get(FollowerThrottledReplicasListProp)) //Should only be 
follower-throttle the moving replica
+        assertEquals("0:100,0:101", 
configChange.get(LeaderThrottledReplicasListProp)) //Should leader-throttle all 
existing (pre move) replicas
+      }
+    }
 
-    //Then moving replicas should be throttled
-    assertEquals("0:100,0:102", moves.get("topic1").get)
+    assigner.assignThrottledReplicas(existing, proposed, mock)
   }
 
   @Test
   def shouldFindMovingReplicasMultiplePartitions() {
+    val control = TopicAndPartition("topic1", 2) -> Seq(100, 102)
     val assigner = new ReassignPartitionsCommand(null, null)
 
-    //Given partitions 0 & 1 moves from broker 100 -> 102
-    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101), 
TopicAndPartition("topic1",1) -> Seq(100, 101))
-    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(101, 102), 
TopicAndPartition("topic1",1) -> Seq(101, 102))
+    //Given partitions 0 & 1 moves from broker 100 -> 102. Partition 2 does 
not move.
+    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), 
TopicAndPartition("topic1", 1) -> Seq(100, 101), control)
+    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), 
TopicAndPartition("topic1", 1) -> Seq(101, 102), control)
 
-      //When
-    val moves = assigner.replicaMoves(existing, proposed)
+    // Then
+    val mock = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, 
configChange: Properties): Unit = {
+        assertEquals("0:102,1:102", 
configChange.get(FollowerThrottledReplicasListProp)) //Should only be 
follower-throttle the moving replica
+        assertEquals("0:100,0:101,1:100,1:101", 
configChange.get(LeaderThrottledReplicasListProp)) //Should leader-throttle all 
existing (pre move) replicas
+      }
+    }
 
-    //Then moving replicas should be throttled
-    assertEquals("0:100,0:102,1:100,1:102", moves.get("topic1").get)
+    //When
+    assigner.assignThrottledReplicas(existing, proposed, mock)
   }
 
   @Test
   def shouldFindMovingReplicasMultipleTopics() {
+    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
     val assigner = new ReassignPartitionsCommand(null, null)
 
-    //Given partition 0 on topics 1 & 2 move from broker 100 -> 102
-    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101), 
TopicAndPartition("topic2",0) -> Seq(100, 101))
-    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(101, 102), 
TopicAndPartition("topic2",0) -> Seq(101, 102))
+    //Given topics 1 -> move from broker 100 -> 102, topics 2 -> move from 
broker 101 -> 100
+    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101), 
TopicAndPartition("topic2", 0) -> Seq(101, 102), control)
+    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(101, 102), 
TopicAndPartition("topic2", 0) -> Seq(100, 102), control)
+
+    //Then
+    val mock = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, 
configChange: Properties): Unit = {
+        topic match {
+          case "topic1" =>
+            assertEquals("0:102", 
configChange.get(FollowerThrottledReplicasListProp))
+            assertEquals("0:100,0:101", 
configChange.get(LeaderThrottledReplicasListProp))
+          case "topic2" =>
+            assertEquals("0:100", 
configChange.get(FollowerThrottledReplicasListProp))
+            assertEquals("0:101,0:102", 
configChange.get(LeaderThrottledReplicasListProp))
+          case _ => fail("Unexpected topic $topic")
+        }
+      }
+    }
 
     //When
-    val moves = assigner.replicaMoves(existing, proposed)
-
-    //Then moving replicas should be throttled
-    assertEquals("0:100,0:102", moves.get("topic1").get)
-    assertEquals("0:100,0:102", moves.get("topic2").get)
+    assigner.assignThrottledReplicas(existing, proposed, mock)
   }
 
   @Test
@@ -102,38 +128,56 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging wi
 
     //Given
     val existing = Map(
-      TopicAndPartition("topic1",0) -> Seq(100, 101),
-      TopicAndPartition("topic1",1) -> Seq(100, 101),
-      TopicAndPartition("topic2",0) -> Seq(100, 101),
-      TopicAndPartition("topic2",1) -> Seq(100, 101)
+      TopicAndPartition("topic1", 0) -> Seq(100, 101),
+      TopicAndPartition("topic1", 1) -> Seq(100, 101),
+      TopicAndPartition("topic2", 0) -> Seq(101, 102),
+      TopicAndPartition("topic2", 1) -> Seq(101, 102)
     )
     val proposed = Map(
-      TopicAndPartition("topic1",0) -> Seq(101, 102),
-      TopicAndPartition("topic1",1) -> Seq(101, 102),
-      TopicAndPartition("topic2",0) -> Seq(101, 102),
-      TopicAndPartition("topic2",1) -> Seq(101, 102)
+      TopicAndPartition("topic1", 0) -> Seq(101, 102), //moves to 102
+      TopicAndPartition("topic1", 1) -> Seq(101, 102), //moves to 102
+      TopicAndPartition("topic2", 0) -> Seq(100, 102), //moves to 100
+      TopicAndPartition("topic2", 1) -> Seq(101, 100)  //moves to 100
     )
 
-    //When
-    val moves = assigner.replicaMoves(existing, proposed)
+    //Then
+    val mock = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, 
configChange: Properties): Unit = {
+        topic match {
+          case "topic1" =>
+            assertEquals("0:102,1:102", 
configChange.get(FollowerThrottledReplicasListProp))
+            assertEquals("0:100,0:101,1:100,1:101", 
configChange.get(LeaderThrottledReplicasListProp))
+          case "topic2" =>
+            assertEquals("0:100,1:100", 
configChange.get(FollowerThrottledReplicasListProp))
+            assertEquals("0:101,0:102,1:101,1:102", 
configChange.get(LeaderThrottledReplicasListProp))
+          case _ => fail()
+        }
+      }
+    }
 
-    //Then moving replicas should be throttled
-    assertEquals("0:100,0:102,1:100,1:102", moves.get("topic1").get)
-    assertEquals("0:100,0:102,1:100,1:102", moves.get("topic2").get)
+    //When
+    assigner.assignThrottledReplicas(existing, proposed, mock)
   }
 
+
   @Test
   def shouldFindTwoMovingReplicasInSamePartition() {
+    val control = TopicAndPartition("topic1", 1) -> Seq(100, 102)
     val assigner = new ReassignPartitionsCommand(null, null)
 
     //Given partition 0 has 2 moves from broker 102 -> 104 & 103 -> 105
-    val existing = Map(TopicAndPartition("topic1",0) -> Seq(100, 101, 102, 
103))
-    val proposed = Map(TopicAndPartition("topic1",0) -> Seq(100, 101, 104, 
105))
+    val existing = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101, 102, 
103), control)
+    val proposed = Map(TopicAndPartition("topic1", 0) -> Seq(100, 101, 104, 
105), control)
+
+    // Then
+    val mock = new TestAdminUtils {
+      override def changeTopicConfig(zkUtils: ZkUtils, topic: String, 
configChange: Properties) = {
+        assertEquals("0:104,0:105", 
configChange.get(FollowerThrottledReplicasListProp)) //Should only be 
follower-throttle the moving replicas
+        assertEquals("0:100,0:101,0:102,0:103", 
configChange.get(LeaderThrottledReplicasListProp)) //Should leader-throttle all 
existing (pre move) replicas
+      }
+    }
 
     //When
-    val moves = assigner.replicaMoves(existing, proposed)
-
-    //Then moving replicas should be throttled
-    assertEquals( "0:102,0:103,0:104,0:105", moves.get("topic1").get)
+    assigner.assignThrottledReplicas(existing, proposed, mock)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala 
b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
index e2c0541..004067b 100644
--- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala
@@ -27,12 +27,13 @@ object ReplicationQuotaUtils {
         
!brokerConfig.contains(DynamicConfig.Broker.ThrottledReplicationRateLimitProp)
       }
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, 
ConfigType.Topic, topic)
-      val topicReset = 
!topicConfig.contains(LogConfig.ThrottledReplicasListProp)
+      val topicReset = 
!(topicConfig.contains(LogConfig.LeaderThrottledReplicasListProp)
+        || topicConfig.contains(LogConfig.FollowerThrottledReplicasListProp))
       brokerReset && topicReset
     }, "Throttle limit/replicas was not unset")
   }
 
-  def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: 
Seq[KafkaServer], topic: String, throttledReplicas: String): Boolean = {
+  def checkThrottleConfigAddedToZK(expectedThrottleRate: Long, servers: 
Seq[KafkaServer], topic: String, throttledLeaders: String, throttledFollowers: 
String): Boolean = {
     TestUtils.waitUntilTrue(() => {
       //Check for limit in ZK
       val brokerConfigAvailable = servers.forall { server =>
@@ -42,9 +43,9 @@ object ReplicationQuotaUtils {
       }
       //Check replicas assigned
       val topicConfig = AdminUtils.fetchEntityConfig(servers(0).zkUtils, 
ConfigType.Topic, topic)
-      val property: String = 
topicConfig.getProperty(LogConfig.ThrottledReplicasListProp)
-      println(topic + "we found "+property)
-      val topicConfigAvailable = property == throttledReplicas
+      val leader = 
topicConfig.getProperty(LogConfig.LeaderThrottledReplicasListProp)
+      val follower = 
topicConfig.getProperty(LogConfig.FollowerThrottledReplicasListProp)
+      val topicConfigAvailable = (leader == throttledLeaders && follower == 
throttledFollowers)
       brokerConfigAvailable && topicConfigAvailable
     }, "throttle limit/replicas was not set")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 3f2e3ed..6d34c78 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import java.util.Properties
 
-import kafka.server.{ThrottledReplicaValidator, KafkaConfig, KafkaServer}
+import kafka.server.{ThrottledReplicaListValidator, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigException
 import org.junit.{Assert, Test}
@@ -74,22 +74,25 @@ class LogConfigTest {
     assertTrue(isValid("100:10,12:10"))
     assertTrue(isValid("100:10,12:10,15:1"))
     assertTrue(isValid("100:10,12:10,15:1  "))
+    assertTrue(isValid("100:0,"))
 
     assertFalse(isValid("100"))
     assertFalse(isValid("100:"))
-    assertFalse(isValid("100:0,"))
     assertFalse(isValid("100:0,10"))
     assertFalse(isValid("100:0,10:"))
     assertFalse(isValid("100:0,10:   "))
+    assertFalse(isValid("100 :0,10:   "))
+    assertFalse(isValid("100: 0,10:   "))
+    assertFalse(isValid("100:0,10 :   "))
   }
 
   private def isValid(configValue: String): Boolean = {
     try {
-      ThrottledReplicaValidator.ensureValid("", configValue)
+      ThrottledReplicaListValidator.ensureValidString("", configValue)
+      true
     } catch {
-      case e: ConfigException => return false
+      case _: ConfigException => false
     }
-    true
   }
 
   private def assertPropertyInvalid(name: String, values: AnyRef*) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 01b1366..cf3413a 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -203,11 +203,11 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     val props: Properties = new Properties()
 
     //Given
-    props.put(ThrottledReplicasListProp, "0:101,0:102,1:101,1:102")
+    props.put(LeaderThrottledReplicasListProp, "0:101,0:102,1:101,1:102")
 
     //When/Then
-    assertEquals(Seq(0,1), configHandler.parseThrottledPartitions(props, 102))
-    assertEquals(Seq(), configHandler.parseThrottledPartitions(props, 103))
+    assertEquals(Seq(0,1), configHandler.parseThrottledPartitions(props, 102, 
LeaderThrottledReplicasListProp))
+    assertEquals(Seq(), configHandler.parseThrottledPartitions(props, 103, 
LeaderThrottledReplicasListProp))
   }
 
   @Test
@@ -216,10 +216,10 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     val props: Properties = new Properties()
 
     //Given
-    props.put(ThrottledReplicasListProp, "*")
+    props.put(LeaderThrottledReplicasListProp, "*")
 
     //When
-    val result = configHandler.parseThrottledPartitions(props, 102)
+    val result = configHandler.parseThrottledPartitions(props, 102, 
LeaderThrottledReplicasListProp)
 
     //Then
     assertEquals(AllReplicas, result)
@@ -231,12 +231,26 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     val props: Properties = new Properties()
 
     //Given
-    props.put(ThrottledReplicasListProp, "")
+    props.put(FollowerThrottledReplicasListProp, "")
 
     //When
-    val result = configHandler.parseThrottledPartitions(props, 102)
+    val result = configHandler.parseThrottledPartitions(props, 102, 
FollowerThrottledReplicasListProp)
 
     //Then
     assertEquals(Seq(), result)
   }
+
+  @Test
+  def shouldParseRegardlessOfWhitespaceAroundValues() {
+    val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null)
+    assertEquals(AllReplicas, parse(configHandler, "* "))
+    assertEquals(Seq(), parse(configHandler, " "))
+    assertEquals(Seq(6), parse(configHandler, "6:102"))
+    assertEquals(Seq(6), parse(configHandler, "6:102 "))
+    assertEquals(Seq(6), parse(configHandler, " 6:102"))
+  }
+
+  def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
+    
configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderThrottledReplicasListProp,
 value), 102, LeaderThrottledReplicasListProp)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 7808bba..c0fc08b 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -17,12 +17,12 @@
 package kafka.server
 
 import kafka.admin.AdminUtils
-import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.config._
 import org.easymock.EasyMock
 import org.junit.{Before, Test}
+import kafka.utils.CoreUtils._
 
 class DynamicConfigTest {
   private final val nonExistentConfig: String = 
"some.config.that.does.not.exist"
@@ -38,21 +38,21 @@ class DynamicConfigTest {
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingBrokerUnknownConfig() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), 
wrapInProps(nonExistentConfig, someValue))
+    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), 
propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingClientIdUnknownConfig() {
-    AdminUtils.changeClientIdConfig(zkUtils, "ClientId", 
wrapInProps(nonExistentConfig, someValue))
+    AdminUtils.changeClientIdConfig(zkUtils, "ClientId", 
propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingUserUnknownConfig() {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", 
wrapInProps(nonExistentConfig, someValue))
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", 
propsWith(nonExistentConfig, someValue))
   }
 
   @Test(expected = classOf[ConfigException])
   def shouldFailConfigsWithInvalidValues() {
-    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), 
wrapInProps(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, "-100"))
+    AdminUtils.changeBrokerConfig(zkUtils, Seq(0), 
propsWith(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, "-100"))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index e2739a0..3fc6f7d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -25,8 +25,8 @@ import kafka.common._
 import kafka.log.LogConfig._
 import kafka.server.KafkaConfig.fromProps
 import kafka.server.QuotaType._
-import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
+import kafka.utils.CoreUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.junit.Assert._
@@ -105,12 +105,14 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
     //Set the throttle limit on all 8 brokers, but only assign throttled 
replicas to the six leaders, or two followers
     (100 to 107).foreach { brokerId =>
-      changeBrokerConfig(zkUtils, Seq(brokerId), 
wrapInProps(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, 
throttle.toString))
+      changeBrokerConfig(zkUtils, Seq(brokerId), 
propsWith(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, 
throttle.toString))
     }
 
     //Either throttle the six leaders or the two followers
-    val throttledReplicas = if (leaderThrottle) 
"0:100,1:101,2:102,3:103,4:104,5:105" else "0:106,1:106,2:106,3:107,4:107,5:107"
-    changeTopicConfig(zkUtils, topic, wrapInProps(ThrottledReplicasListProp, 
throttledReplicas))
+    if (leaderThrottle)
+      changeTopicConfig(zkUtils, topic, 
propsWith(LeaderThrottledReplicasListProp, 
"0:100,1:101,2:102,3:103,4:104,5:105" ))
+    else
+      changeTopicConfig(zkUtils, topic, 
propsWith(FollowerThrottledReplicasListProp, 
"0:106,1:106,2:106,3:107,4:107,5:107"))
 
     //Add data equally to each partition
     producer = createNewProducer(getBrokerListStrFromServers(brokers), retries 
= 5, acks = 0)
@@ -187,8 +189,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     val throttle: Long = msg.length * msgCount / expectedDuration
 
     //Set the throttle limit leader
-    changeBrokerConfig(zkUtils, Seq(100), 
wrapInProps(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, 
throttle.toString))
-    changeTopicConfig(zkUtils, topic, wrapInProps(ThrottledReplicasListProp, 
"0:100"))
+    changeBrokerConfig(zkUtils, Seq(100), 
propsWith(DynamicConfig.Broker.ThrottledReplicationRateLimitProp, 
throttle.toString))
+    changeTopicConfig(zkUtils, topic, 
propsWith(LeaderThrottledReplicasListProp, "0:100"))
 
     //Add data
     addData(msgCount, msg)

http://git-wip-us.apache.org/repos/asf/kafka/blob/18e05117/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 29ce693..3796e48 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1125,13 +1125,6 @@ object TestUtils extends Logging {
     assertTrue(s"$message failed with exception(s) $exceptions", 
exceptions.isEmpty)
 
   }
-
-  def wrapInProps(key: String, value: String): Properties = {
-    val props: Properties = new Properties()
-    props.put(key, value)
-    props
-  }
-
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {

Reply via email to