I think the way reassignment works is asynchronous. Changes are made to
zookeeper but those changes get reflected only when controller watcher
fires for the respective zookeeper path. Is your watcher still alive?

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid>
wrote:

> Looking a bit more into controller log, it seems that when the partition
> assignment is changed in ZooKeeper, the controller has quite a lot
> exceptions communicating with new brokers where the partitions are
> assigned. One thing to note is that the new brokers have Kafka version
> 0.8.2.1 and the controller has Kafka version 0.8.1.1.
>
> 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> [Controller-2-to-broker-48-send-thread] [warn]
> [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> request to broker id:48,host:xyz:7101
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>         at kafka.utils.Utils$.read(Utils.scala:376)
>         at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> Does it explain why the brokers are not aware of the new assignments? Is
> there anyway to recover from this communication problem, like restarting
> the controller?
>
> Thanks,
> Allen
>
>
> On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com> wrote:
>
> > Hello,
> >
> > I developed a tool to add partitions and assign new partitions to a set
> of
> > brokers in one operation by utilizing the API
> > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> >
> > It worked well in most cases. However, in one case, I found that the
> > brokers are not aware of new partitions assigned to them, even though the
> > zookeeper data clearly shows the assignment.
> >
> > Here is the zookeeper data for the partition:
> >
> >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> >
> > On broker 62, the error message is:
> >
> > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > correlation id 2048464 from client x on partition [m,71] failed due to
> > Partition [m,71] doesn't exist on 62
> >
> > Here is the core function of the tool:
> >
> >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> Boolean):
> > Unit = {
> >     val existingPartitionsReplicaList =
> > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> >     printf("Topic config: %s\n\n", config)
> >     if (existingPartitionsReplicaList.size == 0)
> >       throw new AdminOperationException("The topic %s does not
> > exist".format(topic))
> >     val currentPartitions = existingPartitionsReplicaList.size
> >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > e._2.size).max
> >     val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e
> =>
> > e._2).toSet.toSeq
> >     if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > 0) {
> >       printf("Topic %s already has partitions on brokers %s.
> Skipping.\n",
> > topic, brokersToAssignPartitions)
> >       return
> >     }
> >     val totalBrokers = brokers.size
> >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> >     if (oldBrokers == 0) {
> >       throw new IllegalArgumentException("Cannot add partitions to new
> > brokers without existing partitions")
> >     }
> >     val expectedPartitions = currentPartitions * totalBrokers /
> oldBrokers
> >     val newPartitions = expectedPartitions - currentPartitions
> >     if (newPartitions <= 0) {
> >       throw new IllegalArgumentException("Invalid number of new
> partitions
> > %d".format(newPartitions))
> >     }
> >     val newPartitionReplicaList =
> > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > newPartitions, replicationFactor, startPartitionId = currentPartitions)
> >     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > p._1.partition -> p._2)
> >     // add the new list
> >     partitionReplicaList ++= newPartitionReplicaList
> >     printf("Changing number of partitions from %d to %d to topic %s\n\n",
> > currentPartitions, expectedPartitions, topic)
> >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > getAssignmentJson(topic, newPartitionReplicaList))
> >     printf("Complete replica assignment:\n\n%s\n\n",
> > getAssignmentJson(topic, partitionReplicaList))
> >     if (execute) {
> >       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > topic, partitionReplicaList, config, update = true)
> >       println("New partitions are added")
> >     } else {
> >       println("No update is executed in dry run mode")
> >     }
> >   }
> >
> > It seems to me that the new assignment in ZooKeeper data does not
> > propagate to some of the new brokers. However, looking at TopicCommand,
> it
> > uses the same AdminUtils function to add new partitions.
> >
> > Am I missing anything or this is a bug in the broker?
> >
> > Thanks,
> > Allen
> >
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to