Attached is the edited code. Am I heading in right direction? Also, I am
missing something due to which, it seems to work well as long as the
application is running and the files are created right. But as soon as I
restart the application, it goes back to fromOffset as 0. Any thoughts?

regards
Sunita

On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Thanks for confirming Cody.
> To get to use the library, I had to do:
>
> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> "/consumers/topics/"+ topics + "/0")
>
> It worked well. However, I had to specify the partitionId in the zkPath.
> If I want the library to pick all the partitions for a topic, without me
> specifying the path, is it possible out of the box or I need to tweak?
>
> regards
> Sunita
>
>
> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> You are correct that you shouldn't have to worry about broker id.
>>
>> I'm honestly not sure specifically what else you are asking at this point.
>>
>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>> wrote:
>> > Just re-read the kafka architecture. Something that slipped my mind is,
>> it
>> > is leader based. So topic/partitionId pair will be same on all the
>> brokers.
>> > So we do not need to consider brokerid while storing offsets. Still
>> > exploring rest of the items.
>> > regards
>> > Sunita
>> >
>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com>
>> > wrote:
>> >>
>> >> Hello Experts,
>> >>
>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>> >> that it is old approach. Any reasons for that? Any issues observed with
>> >> saving to ZK. The way we are planning to use it is:
>> >> 1. Following
>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>> hope
>> >> that there are no partial writes/ overwriting is possible and
>> offsetRanges
>> >>
>> >> However I have below doubts which I couldnt figure out from the code
>> here
>> >> -
>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>> and
>> >> multiple partitions/topic.
>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>> using
>> >> the ZookeeperOffsetStore as is and it required me to specify the
>> >> partitionId:
>> >>
>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get
>> String("zkHosts"),
>> >> "/consumers/topics/"+ topics + "/0")
>> >>
>> >> For our usecases it is too limiting to include partitionId in the path.
>> >> To get it to work by automatically detecting the existing partitions
>> for a
>> >> given topic, I changed it as below (inspired from
>> >> http://www.programcreek.com/java-api-examples/index.php?api=
>> kafka.utils.ZKGroupTopicDirs):
>> >>
>> >> /**
>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>> >>   * groupID consumer group to get offsets for
>> >>   * topic topic to get offsets for
>> >>   * return - mapping of (topic and) partition to offset
>> >>   */
>> >> private def getOffsets(groupID :String, topic: String):Option[String]
>> = {
>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>> >>   val topicSeq = List(topic).toSeq
>> >>  // try {
>> >>     val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>> topicSeq)
>> >>     var partition:Object=null
>> >>     for (partition <- partitions) {
>> >>       val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>> "/" +
>> >> partition;
>> >>       val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl
>> ient,
>> >> partitionOffsetPath)._1;
>> >>       val offset:Long = if(maybeOffset.isDefined)
>> maybeOffset.get.toLong
>> >> else 0L;
>> >>       val topicAndPartition:TopicAndPartition  = new
>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>> >>       offsets.put(topicAndPartition, offset)
>> >>     }
>> >>   //}
>> >> Option(offsets.mkString(","))
>> >> }
>> >>
>> >> // Read the previously saved offsets from Zookeeper
>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>> >>
>> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
>> >>
>> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>> >>   val start = System.currentTimeMillis()
>> >>   offsetsRangesStrOpt match {
>> >>     case Some(offsetsRangesStr) =>
>> >>       LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>> >>
>> >>       val offsets = offsetsRangesStr.split(",")
>> >>         .map(s => s.split(":"))
>> >>         .map { case Array(partitionStr, offsetStr) =>
>> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>> >>         .toMap
>> >>
>> >>       LogHandler.log.info("Done reading offsets from ZooKeeper. Took
>> " +
>> >> (System.currentTimeMillis() - start))
>> >>
>> >>       Some(offsets)
>> >>     case None =>
>> >>       LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>> >> (System.currentTimeMillis() - start))
>> >>       None
>> >>   }
>> >>
>> >> }
>> >>
>> >> However, I am concerned if the saveOffsets will work well with this
>> >> approach. Thats when I realized we are not considering brokerIds which
>> >> storing offsets and probably the OffsetRanges does not have it either.
>> It
>> >> can only provide Topic, partition, from and until offsets.
>> >>
>> >> I am probably missing something very basic. Probably the library works
>> >> well by itself. Can someone/ Cody explain?
>> >>
>> >> Cody, Thanks a lot for sharing your work.
>> >>
>> >> regards
>> >> Sunita
>> >>
>> >>
>> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org>
>> >> wrote:
>> >>>
>> >>> See
>> >>> https://github.com/koeninger/kafka-exactly-once
>> >>>
>> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>> >>> <mdkhajaasm...@gmail.com> wrote:
>> >>>>
>> >>>> Hi Experts,
>> >>>>
>> >>>> I am looking for some information on how to acheive zero data loss
>> while
>> >>>> working with kafka and Spark. I have searched online and blogs have
>> >>>> different answer. Please let me know if anyone has idea on this.
>> >>>>
>> >>>> Blog 1:
>> >>>>
>> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>> >>>>
>> >>>>
>> >>>> Blog2:
>> >>>>
>> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>> >>>>
>> >>>>
>> >>>> Blog one simply says configuration change with checkpoint directory
>> and
>> >>>> blog 2 give details about on how to save offsets to zoo keeper. can
>> you
>> >>>> please help me out with right approach.
>> >>>>
>> >>>> Thanks,
>> >>>> Asmath
>> >>>>
>> >>>>
>> >>
>> >
>>
>
>

Attachment: ZookeeperOffsetStore_AutoPartitionDetect.scala
Description: Binary data

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to