Updated Branches: refs/heads/0.8 b1891e700 -> 3817857b1
kafka-347; change number of partitions of a topic online; patched by Sriram Subramanian; reviewed by Neha Narkehede, Guozhang Wang, Joel Koshy and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3817857b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3817857b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3817857b Branch: refs/heads/0.8 Commit: 3817857b15f9ce03f10a9730f0ff4619d728b06f Parents: b1891e7 Author: Sriram Subramanian <[email protected]> Authored: Mon Jul 22 21:33:28 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Jul 22 21:33:28 2013 -0700 ---------------------------------------------------------------------- bin/kafka-add-partitions.sh | 19 ++ .../kafka/admin/AddPartitionsCommand.scala | 127 ++++++++++ .../src/main/scala/kafka/admin/AdminUtils.scala | 27 +- .../scala/kafka/admin/CreateTopicCommand.scala | 2 +- .../kafka/controller/KafkaController.scala | 2 + .../controller/PartitionStateMachine.scala | 28 ++- .../unit/kafka/admin/AddPartitionsTest.scala | 251 +++++++++++++++++++ .../test/scala/unit/kafka/admin/AdminTest.scala | 16 +- 8 files changed, 448 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/bin/kafka-add-partitions.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-add-partitions.sh b/bin/kafka-add-partitions.sh new file mode 100755 index 0000000..7d217e2 --- /dev/null +++ b/bin/kafka-add-partitions.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.admin.AddPartitionsCommand $@ http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala new file mode 100644 index 0000000..5757c32 --- /dev/null +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple.OptionParser +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import scala.collection.mutable +import kafka.common.TopicAndPartition + +object AddPartitionsCommand extends Logging { + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need to be added.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add to the topic") + .withRequiredArg + .describedAs("# of partitions") + .ofType(classOf[java.lang.Integer]) + val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning replicas to brokers for the new partitions") + .withRequiredArg + .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " + + "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...") + .ofType(classOf[String]) + .defaultsTo("") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) { + if(!options.has(arg)) { + System.err.println("***Please note that this tool can only be used to add partitions when data for a topic does not use a key.***\n" + + "Missing required argument. " + " \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val topic = options.valueOf(topicOpt) + val zkConnect = options.valueOf(zkConnectOpt) + val nPartitions = options.valueOf(nPartitionsOpt).intValue + val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt) + var zkClient: ZkClient = null + try { + zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) + println("adding partitions succeeded!") + } catch { + case e => + println("adding partitions failed because of " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + if (zkClient != null) + zkClient.close() + } + } + + def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { + val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + if (existingPartitionsReplicaList.size == 0) + throw new AdministrationException("The topic %s does not exist".format(topic)) + + val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get + + // create the new partition replication list + val brokerList = ZkUtils.getSortedBrokerList(zkClient) + val newPartitionReplicaList = if (replicaAssignmentStr == "") + AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) + else + getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) + + // check if manual assignment has the right replication factor + val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size)) + if (unmatchedRepFactorList.size != 0) + throw new AdministrationException("The replication factor in manual replication assignment " + + " is not equal to the existing replication factor for the topic " + existingReplicaList.size) + + info("Add partition list for %s is %s".format(topic, newPartitionReplicaList)) + val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) + // add the new list + partitionReplicaList ++= newPartitionReplicaList + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true) + } + + def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { + val partitionList = replicaAssignmentList.split(",") + val ret = new mutable.HashMap[Int, List[Int]]() + var partitionId = startPartitionId + for (i <- 0 until partitionList.size) { + val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + if (brokerList.size <= 0) + throw new AdministrationException("replication factor must be larger than 0") + if (brokerList.size != brokerList.toSet.size) + throw new AdministrationException("duplicate brokers in replica assignment: " + brokerList) + if (!brokerList.toSet.subsetOf(availableBrokerList)) + throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString + + "available broker:" + availableBrokerList.toString) + ret.put(partitionId, brokerList.toList) + if (ret(partitionId).size != ret(startPartitionId).size) + throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList) + partitionId = partitionId + 1 + } + ret.toMap + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 41cb764..c399bc7 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -48,7 +48,7 @@ object AdminUtils extends Logging { * p7 p8 p9 p5 p6 (3nd replica) */ def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int, - fixedStartIndex: Int = -1) // for testing only + fixedStartIndex: Int = -1, startPartitionId: Int = -1) : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdministrationException("number of partitions must be larger than 0") @@ -59,25 +59,34 @@ object AdminUtils extends Logging { " larger than available brokers: " + brokerList.size) val ret = new mutable.HashMap[Int, List[Int]]() val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0 - var secondReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) + var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size) for (i <- 0 until nPartitions) { - if (i > 0 && (i % brokerList.size == 0)) - secondReplicaShift += 1 - val firstReplicaIndex = (i + startIndex) % brokerList.size + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0)) + nextReplicaShift += 1 + val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size var replicaList = List(brokerList(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) - ret.put(i, replicaList.reverse) + replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) + ret.put(currentPartitionId, replicaList.reverse) + currentPartitionId = currentPartitionId + 1 } ret.toMap } - def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient) { + def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, Seq[Int]], zkClient: ZkClient, update: Boolean = false) { try { val zkPath = ZkUtils.getTopicPath(topic) val jsonPartitionData = ZkUtils.replicaAssignmentZkdata(replicaAssignment.map(e => (e._1.toString -> e._2))) - ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) + + if (!update) { + info("Topic creation " + jsonPartitionData.toString) + ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionData) + } else { + info("Topic update " + jsonPartitionData.toString) + ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData) + } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index e762115..21c1186 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -93,7 +93,7 @@ object CreateTopicCommand extends Logging { else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet) debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment)) - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int]): Map[Int, List[Int]] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 5ac38fd..b07e27b 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -215,6 +215,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() + // register the partition change listeners for all existing topics on failover + controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) initializeAndMaybeTriggerPartitionReassignment() http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index deebed0..0135d45 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.utils.{Logging, ZkUtils} -import org.I0Itec.zkclient.IZkChildListener +import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.log4j.Logger @@ -334,7 +334,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } def registerPartitionChangeListener(topic: String) = { - zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic)) + zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic)) } private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { @@ -383,15 +383,31 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - class PartitionChangeListener(topic: String) extends IZkChildListener with Logging { - this.logIdent = "[Controller " + controller.config.brokerId + "]: " + + class AddPartitionsListener(topic: String) extends IZkDataListener with Logging { + + this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: " @throws(classOf[Exception]) - def handleChildChange(parentPath : String, children : java.util.List[String]) { + def handleDataChange(dataPath : String, data: Object) { controllerContext.controllerLock synchronized { - // TODO: To be completed as part of KAFKA-41 + try { + info("Add Partition triggered " + data.toString + " for path " + dataPath) + val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p => + !controllerContext.partitionReplicaAssignment.contains(p._1)) + info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) + controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) + } catch { + case e => error("Error while handling add partitions for data path " + dataPath, e ) + } } } + + @throws(classOf[Exception]) + def handleDataDeleted(parentPath : String) { + // this is not implemented for partition change + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala new file mode 100644 index 0000000..06be990 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.TestUtils._ +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.cluster.Broker +import kafka.client.ClientUtils +import kafka.server.{KafkaConfig, KafkaServer} + +class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { + val brokerId1 = 0 + val brokerId2 = 1 + val brokerId3 = 2 + val brokerId4 = 3 + + val port1 = TestUtils.choosePort() + val port2 = TestUtils.choosePort() + val port3 = TestUtils.choosePort() + val port4 = TestUtils.choosePort() + + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var brokers: Seq[Broker] = Seq.empty[Broker] + + val partitionId = 0 + + val topic1 = "new-topic1" + val topic2 = "new-topic2" + val topic3 = "new-topic3" + val topic4 = "new-topic4" + + override def setUp() { + super.setUp() + // start all the servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + val server3 = TestUtils.createServer(new KafkaConfig(configProps3)) + val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) + + servers ++= List(server1, server2, server3, server4) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + + // create topics with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1") + CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2") + CreateTopicCommand.createTopic(zkClient, topic3, 1, 4, "2:3:0:1") + CreateTopicCommand.createTopic(zkClient, topic4, 1, 4, "0:3") + + + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500) + + debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1))) + + assertTrue("Leader should get elected", leader1.isDefined) + assertTrue("Leader should get elected", leader2.isDefined) + assertTrue("Leader should get elected", leader3.isDefined) + assertTrue("Leader should get elected", leader4.isDefined) + + assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2)) + assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3)) + assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3)) + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDirs)) + super.tearDown() + } + + def testTopicDoesNotExist { + try { + AddPartitionsCommand.addPartitions(zkClient, "Blah", 1) + fail("Topic should not exist") + } catch { + case e: AdministrationException => //this is good + case e2 => throw e2 + } + } + + def testWrongReplicaCount { + try { + AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2") + fail("Add partitions should fail") + } catch { + case e: AdministrationException => //this is good + case e2 => throw e2 + } + } + + def testIncrementPartitions { + AddPartitionsCommand.addPartitions(zkClient, topic1, 2) + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500) + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic1, 2).get + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2, 1000) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + 2000,0).topicsMetadata + val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) + val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata + assertEquals(partitionDataForTopic1.size, 3) + assertEquals(partitionDataForTopic1(1).partitionId, 1) + assertEquals(partitionDataForTopic1(2).partitionId, 2) + val replicas = partitionDataForTopic1(1).replicas + assertEquals(replicas.size, 2) + assert(replicas.contains(partitionDataForTopic1(1).leader.get)) + } + + def testManualAssignmentOfReplicas { + AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3") + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500) + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic2, 2).get + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2, 1000) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + 2000,0).topicsMetadata + val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) + val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata + assertEquals(partitionDataForTopic2.size, 3) + assertEquals(partitionDataForTopic2(1).partitionId, 1) + assertEquals(partitionDataForTopic2(2).partitionId, 2) + val replicas = partitionDataForTopic2(1).replicas + assertEquals(replicas.size, 2) + assert(replicas(0).id == 0 || replicas(0).id == 1) + assert(replicas(1).id == 0 || replicas(1).id == 1) + } + + def testReplicaPlacement { + AddPartitionsCommand.addPartitions(zkClient, topic3, 6) + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 3, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 4, 500) + var leader5 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 5, 500) + var leader6 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 6, 500) + + val leader1FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 1).get + val leader2FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 2).get + val leader3FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 3).get + val leader4FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 4).get + val leader5FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 5).get + val leader6FromZk = ZkUtils.getLeaderForPartition(zkClient, topic3, 6).get + + assertEquals(leader1.get, leader1FromZk) + assertEquals(leader2.get, leader2FromZk) + assertEquals(leader3.get, leader3FromZk) + assertEquals(leader4.get, leader4FromZk) + assertEquals(leader5.get, leader5FromZk) + assertEquals(leader6.get, leader6FromZk) + + // read metadata from a broker and verify the new topic partitions exist + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5, 1000) + TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6, 1000) + + val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + 2000,0).topicsMetadata + + val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head + val partition1DataForTopic3 = metaDataForTopic3.partitionsMetadata(1) + val partition2DataForTopic3 = metaDataForTopic3.partitionsMetadata(2) + val partition3DataForTopic3 = metaDataForTopic3.partitionsMetadata(3) + val partition4DataForTopic3 = metaDataForTopic3.partitionsMetadata(4) + val partition5DataForTopic3 = metaDataForTopic3.partitionsMetadata(5) + val partition6DataForTopic3 = metaDataForTopic3.partitionsMetadata(6) + + assertEquals(partition1DataForTopic3.replicas.size, 4) + assertEquals(partition1DataForTopic3.replicas(0).id, 3) + assertEquals(partition1DataForTopic3.replicas(1).id, 2) + assertEquals(partition1DataForTopic3.replicas(2).id, 0) + assertEquals(partition1DataForTopic3.replicas(3).id, 1) + + assertEquals(partition2DataForTopic3.replicas.size, 4) + assertEquals(partition2DataForTopic3.replicas(0).id, 0) + assertEquals(partition2DataForTopic3.replicas(1).id, 3) + assertEquals(partition2DataForTopic3.replicas(2).id, 1) + assertEquals(partition2DataForTopic3.replicas(3).id, 2) + + assertEquals(partition3DataForTopic3.replicas.size, 4) + assertEquals(partition3DataForTopic3.replicas(0).id, 1) + assertEquals(partition3DataForTopic3.replicas(1).id, 0) + assertEquals(partition3DataForTopic3.replicas(2).id, 2) + assertEquals(partition3DataForTopic3.replicas(3).id, 3) + + assertEquals(partition4DataForTopic3.replicas.size, 4) + assertEquals(partition4DataForTopic3.replicas(0).id, 2) + assertEquals(partition4DataForTopic3.replicas(1).id, 3) + assertEquals(partition4DataForTopic3.replicas(2).id, 0) + assertEquals(partition4DataForTopic3.replicas(3).id, 1) + + assertEquals(partition5DataForTopic3.replicas.size, 4) + assertEquals(partition5DataForTopic3.replicas(0).id, 3) + assertEquals(partition5DataForTopic3.replicas(1).id, 0) + assertEquals(partition5DataForTopic3.replicas(2).id, 1) + assertEquals(partition5DataForTopic3.replicas(3).id, 2) + + assertEquals(partition6DataForTopic3.replicas.size, 4) + assertEquals(partition6DataForTopic3.replicas(0).id, 0) + assertEquals(partition6DataForTopic3.replicas(1).id, 1) + assertEquals(partition6DataForTopic3.replicas(2).id, 2) + assertEquals(partition6DataForTopic3.replicas(3).id, 3) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/3817857b/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 0d8b70f..dc0013f 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -157,7 +157,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val topic = "test" TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap @@ -166,7 +166,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i)) try { - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) fail("shouldn't be able to create a topic already exists") } catch { case e: TopicExistsException => // this is good @@ -181,7 +181,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(0, 2, 3) val partitionToBeReassigned = 0 @@ -206,7 +206,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(1, 2, 3) val partitionToBeReassigned = 0 @@ -232,7 +232,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // reassign partition 0 val newReplicas = Seq(2, 3) val partitionToBeReassigned = 0 @@ -273,7 +273,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val expectedReplicaAssignment = Map(0 -> List(0, 1)) val topic = "test" // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // put the partition in the reassigned path as well // reassign partition 0 val newReplicas = Seq(0, 1) @@ -312,7 +312,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get @@ -333,7 +333,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_)) val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) // create the topic - AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) TestUtils.waitUntilMetadataIsPropagated(servers, topic, partition, 1000)
