Probably you can try restarting the controller and have same version for
the controller and the brokers.
BTW, was there any specific reason you are running 2 different versions for
the controller and other brokers?

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 4:02 PM, Allen Wang <aw...@netflix.com.invalid>
wrote:

> Yes, the watcher is still alive. The log in the controller indicates that
> it observed the changes.
>
>
> On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > I think the way reassignment works is asynchronous. Changes are made to
> > zookeeper but those changes get reflected only when controller watcher
> > fires for the respective zookeeper path. Is your watcher still alive?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang <aw...@netflix.com.invalid>
> > wrote:
> >
> > > Looking a bit more into controller log, it seems that when the
> partition
> > > assignment is changed in ZooKeeper, the controller has quite a lot
> > > exceptions communicating with new brokers where the partitions are
> > > assigned. One thing to note is that the new brokers have Kafka version
> > > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> > >
> > > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > > [Controller-2-to-broker-48-send-thread] [warn]
> > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > > request to broker id:48,host:xyz:7101
> > > java.io.EOFException: Received -1 when reading from channel, socket has
> > > likely been closed.
> > >         at kafka.utils.Utils$.read(Utils.scala:376)
> > >         at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > >         at
> > > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > >         at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > >         at
> > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > >         at
> > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> > >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > > Does it explain why the brokers are not aware of the new assignments?
> Is
> > > there anyway to recover from this communication problem, like
> restarting
> > > the controller?
> > >
> > > Thanks,
> > > Allen
> > >
> > >
> > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang <aw...@netflix.com>
> wrote:
> > >
> > > > Hello,
> > > >
> > > > I developed a tool to add partitions and assign new partitions to a
> set
> > > of
> > > > brokers in one operation by utilizing the API
> > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > > >
> > > > It worked well in most cases. However, in one case, I found that the
> > > > brokers are not aware of new partitions assigned to them, even though
> > the
> > > > zookeeper data clearly shows the assignment.
> > > >
> > > > Here is the zookeeper data for the partition:
> > > >
> > > >
> > > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > > >
> > > > On broker 62, the error message is:
> > > >
> > > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > > correlation id 2048464 from client x on partition [m,71] failed due
> to
> > > > Partition [m,71] doesn't exist on 62
> > > >
> > > > Here is the core function of the tool:
> > > >
> > > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > > Boolean):
> > > > Unit = {
> > > >     val existingPartitionsReplicaList =
> > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > > >     val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > > >     printf("Topic config: %s\n\n", config)
> > > >     if (existingPartitionsReplicaList.size == 0)
> > > >       throw new AdminOperationException("The topic %s does not
> > > > exist".format(topic))
> > > >     val currentPartitions = existingPartitionsReplicaList.size
> > > >     val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > > e._2.size).max
> > > >     val brokersWithPartitions =
> existingPartitionsReplicaList.flatMap(e
> > > =>
> > > > e._2).toSet.toSeq
> > > >     if
> > (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > > 0) {
> > > >       printf("Topic %s already has partitions on brokers %s.
> > > Skipping.\n",
> > > > topic, brokersToAssignPartitions)
> > > >       return
> > > >     }
> > > >     val totalBrokers = brokers.size
> > > >     val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > > >     if (oldBrokers == 0) {
> > > >       throw new IllegalArgumentException("Cannot add partitions to
> new
> > > > brokers without existing partitions")
> > > >     }
> > > >     val expectedPartitions = currentPartitions * totalBrokers /
> > > oldBrokers
> > > >     val newPartitions = expectedPartitions - currentPartitions
> > > >     if (newPartitions <= 0) {
> > > >       throw new IllegalArgumentException("Invalid number of new
> > > partitions
> > > > %d".format(newPartitions))
> > > >     }
> > > >     val newPartitionReplicaList =
> > > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > > > newPartitions, replicationFactor, startPartitionId =
> currentPartitions)
> > > >     val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > > > p._1.partition -> p._2)
> > > >     // add the new list
> > > >     partitionReplicaList ++= newPartitionReplicaList
> > > >     printf("Changing number of partitions from %d to %d to topic
> > %s\n\n",
> > > > currentPartitions, expectedPartitions, topic)
> > > >     printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > > > getAssignmentJson(topic, newPartitionReplicaList))
> > > >     printf("Complete replica assignment:\n\n%s\n\n",
> > > > getAssignmentJson(topic, partitionReplicaList))
> > > >     if (execute) {
> > > >
> >  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> > > > topic, partitionReplicaList, config, update = true)
> > > >       println("New partitions are added")
> > > >     } else {
> > > >       println("No update is executed in dry run mode")
> > > >     }
> > > >   }
> > > >
> > > > It seems to me that the new assignment in ZooKeeper data does not
> > > > propagate to some of the new brokers. However, looking at
> TopicCommand,
> > > it
> > > > uses the same AdminUtils function to add new partitions.
> > > >
> > > > Am I missing anything or this is a bug in the broker?
> > > >
> > > > Thanks,
> > > > Allen
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to