Hello,
I developed a tool to add partitions and assign new partitions to a set of
brokers in one operation by utilizing the API
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
It worked well in most cases. However, in one case, I found that the
brokers are not aware of new partitions assigned to them, even though the
zookeeper data clearly shows the assignment.
Here is the zookeeper data for the partition:
{"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
On broker 62, the error message is:
2015-03-17 17:11:57,157 WARN kafka.utils.Logging$class:83
[kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
correlation id 2048464 from client x on partition [m,71] failed due to
Partition [m,71] doesn't exist on 62
Here is the core function of the tool:
def addPartitionsToTopic(zkClient: ZkClient, topic: String,
brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean):
Unit = {
val existingPartitionsReplicaList =
ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
val config = AdminUtils.fetchTopicConfig(zkClient, topic)
printf("Topic config: %s\n\n", config)
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not
exist".format(topic))
val currentPartitions = existingPartitionsReplicaList.size
val replicationFactor = existingPartitionsReplicaList.map(e =>
e._2.size).max
val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e =>
e._2).toSet.toSeq
if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
0) {
printf("Topic %s already has partitions on brokers %s. Skipping.\n",
topic, brokersToAssignPartitions)
return
}
val totalBrokers = brokers.size
val oldBrokers = totalBrokers - brokersToAssignPartitions.size
if (oldBrokers == 0) {
throw new IllegalArgumentException("Cannot add partitions to new
brokers without existing partitions")
}
val expectedPartitions = currentPartitions * totalBrokers / oldBrokers
val newPartitions = expectedPartitions - currentPartitions
if (newPartitions <= 0) {
throw new IllegalArgumentException("Invalid number of new partitions
%d".format(newPartitions))
}
val newPartitionReplicaList =
AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
newPartitions, replicationFactor, startPartitionId = currentPartitions)
val partitionReplicaList = existingPartitionsReplicaList.map(p =>
p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
printf("Changing number of partitions from %d to %d to topic %s\n\n",
currentPartitions, expectedPartitions, topic)
printf("Replica reassignment for new partitions:\n\n%s\n\n",
getAssignmentJson(topic, newPartitionReplicaList))
printf("Complete replica assignment:\n\n%s\n\n",
getAssignmentJson(topic, partitionReplicaList))
if (execute) {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
topic, partitionReplicaList, config, update = true)
println("New partitions are added")
} else {
println("No update is executed in dry run mode")
}
}
It seems to me that the new assignment in ZooKeeper data does not propagate
to some of the new brokers. However, looking at TopicCommand, it uses the
same AdminUtils function to add new partitions.
Am I missing anything or this is a bug in the broker?
Thanks,
Allen