The error in the file I just shared is here:

val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" +
partition._2(0);  --> this was just partition and hence there was an
error

fetching the offset.

Still testing. Somehow Cody, your code never lead to file already
exists sort of errors (I am saving the output of the dstream
as parquet file, after converting it to a dataframe. The batch
interval will be 2 hrs)

The code in the main is here:

  val offsetsStore = new
ZooKeeperOffsetsStore(conf.getString("zkHosts"),
conf.getString("groupId"), conf.getString("topics"))
   val storedOffsets = offsetsStore.readOffsets()
 LogHandler.log.info("Fetched the offset from zookeeper")

 val kafkaArr =  storedOffsets match {
   case None =>
     // start from the initial offsets
     
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaProps, Set(topics))

   case Some(fromOffsets) =>
     // start from previously saved offsets
     val messageHandler: MessageAndMetadata[String, Array[Byte]] =>
(String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]])
=> (mmd.key, mmd.message)
     
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)

     //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage,
(String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
 }

 kafkaArr.foreachRDD{ (rdd,time) =>

        val schema =
SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
        val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r =>
Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
        }
        val df = sql.createDataFrame(ardd,schema)
   LogHandler.log.info("Created dataframe")
   val offsetSaved =
offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
   LogHandler.log.info("Saved offset to Zookeeper")
   df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
   LogHandler.log.info("Created the parquet file")
 }

Thanks

Sunita





On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> Attached is the edited code. Am I heading in right direction? Also, I am
> missing something due to which, it seems to work well as long as the
> application is running and the files are created right. But as soon as I
> restart the application, it goes back to fromOffset as 0. Any thoughts?
>
> regards
> Sunita
>
> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Thanks for confirming Cody.
>> To get to use the library, I had to do:
>>
>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>> "/consumers/topics/"+ topics + "/0")
>>
>> It worked well. However, I had to specify the partitionId in the zkPath.
>> If I want the library to pick all the partitions for a topic, without me
>> specifying the path, is it possible out of the box or I need to tweak?
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> You are correct that you shouldn't have to worry about broker id.
>>>
>>> I'm honestly not sure specifically what else you are asking at this
>>> point.
>>>
>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>>> wrote:
>>> > Just re-read the kafka architecture. Something that slipped my mind
>>> is, it
>>> > is leader based. So topic/partitionId pair will be same on all the
>>> brokers.
>>> > So we do not need to consider brokerid while storing offsets. Still
>>> > exploring rest of the items.
>>> > regards
>>> > Sunita
>>> >
>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <sunitarv...@gmail.com
>>> >
>>> > wrote:
>>> >>
>>> >> Hello Experts,
>>> >>
>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's comments
>>> >> that it is old approach. Any reasons for that? Any issues observed
>>> with
>>> >> saving to ZK. The way we are planning to use it is:
>>> >> 1. Following
>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>> g-zero-data-loss.html
>>> >> 2. Saving to the same file with offsetRange as a part of the file. We
>>> hope
>>> >> that there are no partial writes/ overwriting is possible and
>>> offsetRanges
>>> >>
>>> >> However I have below doubts which I couldnt figure out from the code
>>> here
>>> >> -
>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>>> >> partitionId:FromOffset stay unique in a cluster with multiple brokers
>>> and
>>> >> multiple partitions/topic.
>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>>> using
>>> >> the ZookeeperOffsetStore as is and it required me to specify the
>>> >> partitionId:
>>> >>
>>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get
>>> String("zkHosts"),
>>> >> "/consumers/topics/"+ topics + "/0")
>>> >>
>>> >> For our usecases it is too limiting to include partitionId in the
>>> path.
>>> >> To get it to work by automatically detecting the existing partitions
>>> for a
>>> >> given topic, I changed it as below (inspired from
>>> >> http://www.programcreek.com/java-api-examples/index.php?api=
>>> kafka.utils.ZKGroupTopicDirs):
>>> >>
>>> >> /**
>>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>>> >>   * groupID consumer group to get offsets for
>>> >>   * topic topic to get offsets for
>>> >>   * return - mapping of (topic and) partition to offset
>>> >>   */
>>> >> private def getOffsets(groupID :String, topic: String):Option[String]
>>> = {
>>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>>> >>   val topicSeq = List(topic).toSeq
>>> >>  // try {
>>> >>     val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>>> topicSeq)
>>> >>     var partition:Object=null
>>> >>     for (partition <- partitions) {
>>> >>       val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>>> "/" +
>>> >> partition;
>>> >>       val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl
>>> ient,
>>> >> partitionOffsetPath)._1;
>>> >>       val offset:Long = if(maybeOffset.isDefined)
>>> maybeOffset.get.toLong
>>> >> else 0L;
>>> >>       val topicAndPartition:TopicAndPartition  = new
>>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>>> >>       offsets.put(topicAndPartition, offset)
>>> >>     }
>>> >>   //}
>>> >> Option(offsets.mkString(","))
>>> >> }
>>> >>
>>> >> // Read the previously saved offsets from Zookeeper
>>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>>> >>
>>> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
>>> >>
>>> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>>> >>   val start = System.currentTimeMillis()
>>> >>   offsetsRangesStrOpt match {
>>> >>     case Some(offsetsRangesStr) =>
>>> >>       LogHandler.log.debug(s"Read offset ranges: ${offsetsRangesStr}")
>>> >>
>>> >>       val offsets = offsetsRangesStr.split(",")
>>> >>         .map(s => s.split(":"))
>>> >>         .map { case Array(partitionStr, offsetStr) =>
>>> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>>> >>         .toMap
>>> >>
>>> >>       LogHandler.log.info("Done reading offsets from ZooKeeper. Took
>>> " +
>>> >> (System.currentTimeMillis() - start))
>>> >>
>>> >>       Some(offsets)
>>> >>     case None =>
>>> >>       LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>>> >> (System.currentTimeMillis() - start))
>>> >>       None
>>> >>   }
>>> >>
>>> >> }
>>> >>
>>> >> However, I am concerned if the saveOffsets will work well with this
>>> >> approach. Thats when I realized we are not considering brokerIds which
>>> >> storing offsets and probably the OffsetRanges does not have it
>>> either. It
>>> >> can only provide Topic, partition, from and until offsets.
>>> >>
>>> >> I am probably missing something very basic. Probably the library works
>>> >> well by itself. Can someone/ Cody explain?
>>> >>
>>> >> Cody, Thanks a lot for sharing your work.
>>> >>
>>> >> regards
>>> >> Sunita
>>> >>
>>> >>
>>> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org>
>>> >> wrote:
>>> >>>
>>> >>> See
>>> >>> https://github.com/koeninger/kafka-exactly-once
>>> >>>
>>> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>>> >>> <mdkhajaasm...@gmail.com> wrote:
>>> >>>>
>>> >>>> Hi Experts,
>>> >>>>
>>> >>>> I am looking for some information on how to acheive zero data loss
>>> while
>>> >>>> working with kafka and Spark. I have searched online and blogs have
>>> >>>> different answer. Please let me know if anyone has idea on this.
>>> >>>>
>>> >>>> Blog 1:
>>> >>>>
>>> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>> >>>>
>>> >>>>
>>> >>>> Blog2:
>>> >>>>
>>> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>> g-zero-data-loss.html
>>> >>>>
>>> >>>>
>>> >>>> Blog one simply says configuration change with checkpoint directory
>>> and
>>> >>>> blog 2 give details about on how to save offsets to zoo keeper. can
>>> you
>>> >>>> please help me out with right approach.
>>> >>>>
>>> >>>> Thanks,
>>> >>>> Asmath
>>> >>>>
>>> >>>>
>>> >>
>>> >
>>>
>>
>>
>

Reply via email to