Hello Experts,

I am trying to use the saving to ZK design. Just saw Sudhir's comments that
it is old approach. Any reasons for that? Any issues observed with saving
to ZK. The way we are planning to use it is:
1. Following http://aseigneurin.github.io/2016/05/07/spark-kafka-
achieving-zero-data-loss.html
2. Saving to the same file with offsetRange as a part of the file. We hope
that there are no partial writes/ overwriting is possible and offsetRanges

However I have below doubts which I couldnt figure out from the code here -
https://github.com/ippontech/spark-kafka-source/blob/
master/src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
1. The brokerId is not part of the OffsetRange. How will just the
partitionId:FromOffset stay unique in a cluster with multiple brokers and
multiple partitions/topic.
2. Do we have to specify zkPath to include the partitionid. I tried using
the ZookeeperOffsetStore as is and it required me to specify the
partitionId:

val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"), "/consumers/topics/"+
topics + "/0")

For our usecases it is too limiting to include partitionId in the path.
To get it to work by automatically detecting the existing partitions
for a given topic, I changed it as below (inspired from
http://www.programcreek.com/java-api-examples/index.php?api=kafka.utils.ZKGroupTopicDirs):

/**
  * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
  * groupID consumer group to get offsets for
  * topic topic to get offsets for
  * return - mapping of (topic and) partition to offset
  */
private def getOffsets(groupID :String, topic: String):Option[String] = {
  val topicDirs = new ZKGroupTopicDirs(groupID, topic)
  val offsets = new mutable.HashMap[TopicAndPartition,Long]()
  val topicSeq = List(topic).toSeq
 // try {
    val partitions = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
    var partition:Object=null
    for (partition <- partitions) {
      val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
"/" + partition;
      val maybeOffset:Option[String] =
ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)._1;
      val offset:Long = if(maybeOffset.isDefined)
maybeOffset.get.toLong else 0L;
      val topicAndPartition:TopicAndPartition  = new
TopicAndPartition(topic, Integer.parseInt(partition.toString));
      offsets.put(topicAndPartition, offset)
    }
  //}
Option(offsets.mkString(","))
}

// Read the previously saved offsets from Zookeeper
override def readOffsets: Option[Map[TopicAndPartition, Long]] = {

  LogHandler.log.info("Reading offsets from ZooKeeper")

  val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
  val start = System.currentTimeMillis()
  offsetsRangesStrOpt match {
    case Some(offsetsRangesStr) =>
      LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")

      val offsets = offsetsRangesStr.split(",")
        .map(s => s.split(":"))
        .map { case Array(partitionStr, offsetStr) =>
(TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
        .toMap

      LogHandler.log.info("Done reading offsets from ZooKeeper. Took "
+ (System.currentTimeMillis() - start))

      Some(offsets)
    case None =>
      LogHandler.log.info("No offsets found in ZooKeeper. Took " +
(System.currentTimeMillis() - start))
      None
  }

}

However, I am concerned if the saveOffsets will work well with this
approach. Thats when I realized we are not considering brokerIds which
storing offsets and probably the OffsetRanges does not have it either. It
can only provide Topic, partition, from and until offsets.

I am probably missing something very basic. Probably the library works well
by itself. Can someone/ Cody explain?

Cody, Thanks a lot for sharing your work.

regards
Sunita

On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org> wrote:

> See
> https://github.com/koeninger/kafka-exactly-once
> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" <mdkhajaasm...@gmail.com>
> wrote:
>
>> Hi Experts,
>>
>> I am looking for some information on how to acheive zero data loss while
>> working with kafka and Spark. I have searched online and blogs have
>> different answer. Please let me know if anyone has idea on this.
>>
>> Blog 1:
>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>
>>
>> Blog2:
>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>> g-zero-data-loss.html
>>
>>
>> Blog one simply says configuration change with checkpoint directory and
>> blog 2 give details about on how to save offsets to zoo keeper. can you
>> please help me out with right approach.
>>
>> Thanks,
>> Asmath
>>
>>
>>

Reply via email to