Attached is the edited code. Am I heading in right direction? Also, I am missing something due to which, it seems to work well as long as the application is running and the files are created right. But as soon as I restart the application, it goes back to fromOffset as 0. Any thoughts?
regards Sunita On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com> wrote: > Thanks for confirming Cody. > To get to use the library, I had to do: > > val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), > "/consumers/topics/"+ topics + "/0") > > It worked well. However, I had to specify the partitionId in the zkPath. > If I want the library to pick all the partitions for a topic, without me > specifying the path, is it possible out of the box or I need to tweak? > > regards > Sunita > > > On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You are correct that you shouldn't have to worry about broker id. >> >> I'm honestly not sure specifically what else you are asking at this point. >> >> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com> >> wrote: >> > Just re-read the kafka architecture. Something that slipped my mind is, >> it >> > is leader based. So topic/partitionId pair will be same on all the >> brokers. >> > So we do not need to consider brokerid while storing offsets. Still >> > exploring rest of the items. >> > regards >> > Sunita >> > >> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com> >> > wrote: >> >> >> >> Hello Experts, >> >> >> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments >> >> that it is old approach. Any reasons for that? Any issues observed with >> >> saving to ZK. The way we are planning to use it is: >> >> 1. Following >> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin >> g-zero-data-loss.html >> >> 2. Saving to the same file with offsetRange as a part of the file. We >> hope >> >> that there are no partial writes/ overwriting is possible and >> offsetRanges >> >> >> >> However I have below doubts which I couldnt figure out from the code >> here >> >> - >> >> https://github.com/ippontech/spark-kafka-source/blob/master/ >> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala >> >> 1. The brokerId is not part of the OffsetRange. How will just the >> >> partitionId:FromOffset stay unique in a cluster with multiple brokers >> and >> >> multiple partitions/topic. >> >> 2. Do we have to specify zkPath to include the partitionid. I tried >> using >> >> the ZookeeperOffsetStore as is and it required me to specify the >> >> partitionId: >> >> >> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get >> String("zkHosts"), >> >> "/consumers/topics/"+ topics + "/0") >> >> >> >> For our usecases it is too limiting to include partitionId in the path. >> >> To get it to work by automatically detecting the existing partitions >> for a >> >> given topic, I changed it as below (inspired from >> >> http://www.programcreek.com/java-api-examples/index.php?api= >> kafka.utils.ZKGroupTopicDirs): >> >> >> >> /** >> >> * zkServers Zookeeper server string: host1:port1[,host2:port2,...] >> >> * groupID consumer group to get offsets for >> >> * topic topic to get offsets for >> >> * return - mapping of (topic and) partition to offset >> >> */ >> >> private def getOffsets(groupID :String, topic: String):Option[String] >> = { >> >> val topicDirs = new ZKGroupTopicDirs(groupID, topic) >> >> val offsets = new mutable.HashMap[TopicAndPartition,Long]() >> >> val topicSeq = List(topic).toSeq >> >> // try { >> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, >> topicSeq) >> >> var partition:Object=null >> >> for (partition <- partitions) { >> >> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + >> "/" + >> >> partition; >> >> val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl >> ient, >> >> partitionOffsetPath)._1; >> >> val offset:Long = if(maybeOffset.isDefined) >> maybeOffset.get.toLong >> >> else 0L; >> >> val topicAndPartition:TopicAndPartition = new >> >> TopicAndPartition(topic, Integer.parseInt(partition.toString)); >> >> offsets.put(topicAndPartition, offset) >> >> } >> >> //} >> >> Option(offsets.mkString(",")) >> >> } >> >> >> >> // Read the previously saved offsets from Zookeeper >> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = { >> >> >> >> LogHandler.log.info("Reading offsets from ZooKeeper") >> >> >> >> val offsetsRangesStrOpt = getOffsets(consumerGrp,topic) >> >> val start = System.currentTimeMillis() >> >> offsetsRangesStrOpt match { >> >> case Some(offsetsRangesStr) => >> >> LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}") >> >> >> >> val offsets = offsetsRangesStr.split(",") >> >> .map(s => s.split(":")) >> >> .map { case Array(partitionStr, offsetStr) => >> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) } >> >> .toMap >> >> >> >> LogHandler.log.info("Done reading offsets from ZooKeeper. Took >> " + >> >> (System.currentTimeMillis() - start)) >> >> >> >> Some(offsets) >> >> case None => >> >> LogHandler.log.info("No offsets found in ZooKeeper. Took " + >> >> (System.currentTimeMillis() - start)) >> >> None >> >> } >> >> >> >> } >> >> >> >> However, I am concerned if the saveOffsets will work well with this >> >> approach. Thats when I realized we are not considering brokerIds which >> >> storing offsets and probably the OffsetRanges does not have it either. >> It >> >> can only provide Topic, partition, from and until offsets. >> >> >> >> I am probably missing something very basic. Probably the library works >> >> well by itself. Can someone/ Cody explain? >> >> >> >> Cody, Thanks a lot for sharing your work. >> >> >> >> regards >> >> Sunita >> >> >> >> >> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org> >> >> wrote: >> >>> >> >>> See >> >>> https://github.com/koeninger/kafka-exactly-once >> >>> >> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" >> >>> <mdkhajaasm...@gmail.com> wrote: >> >>>> >> >>>> Hi Experts, >> >>>> >> >>>> I am looking for some information on how to acheive zero data loss >> while >> >>>> working with kafka and Spark. I have searched online and blogs have >> >>>> different answer. Please let me know if anyone has idea on this. >> >>>> >> >>>> Blog 1: >> >>>> >> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault >> -tolerance-and-zero-data-loss-in-spark-streaming.html >> >>>> >> >>>> >> >>>> Blog2: >> >>>> >> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin >> g-zero-data-loss.html >> >>>> >> >>>> >> >>>> Blog one simply says configuration change with checkpoint directory >> and >> >>>> blog 2 give details about on how to save offsets to zoo keeper. can >> you >> >>>> please help me out with right approach. >> >>>> >> >>>> Thanks, >> >>>> Asmath >> >>>> >> >>>> >> >> >> > >> > >
ZookeeperOffsetStore_AutoPartitionDetect.scala
Description: Binary data
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org