just about done with the trunk merge from 0.8 but ran into something here
private def deletePartitionOwnershipFromZK(topic: String, partition: String) { val topicDirs = new ZKGroupTopicDirs(group, topic) val znode = topicDirs.consumerOwnerDir + "/" + partition deletePath(zkClient, znode) debug("Consumer " + consumerIdString + " releasing " + znode) } private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]])= { info("Releasing partition ownership") <<<<<<< .working for ((topic, infos) <- topicRegistry) { for(partition <- infos.keys) { val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString) deletePath(zkClient, partitionOwnerPath) debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath) } ======= for ((topic, infos) <- localTopicRegistry) { for(partition <- infos.keys) deletePartitionOwnershipFromZK(topic, partition.toString) localTopicRegistry.remove(topic) >>>>>>> .merge-right.r1342339 } } looks like between KAFKA-300, 239 and 286 not sure what we want to-do here, thoughts? /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> */