This is enough to get it to work:

df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet",
SaveMode.Overwrite)

And tests so far (in local env) seem good with the edits. Yet to test
on the cluster. Cody, appreciate your thoughts on the edits.

Just want to make sure I am not doing an overkill or overseeing a
potential issue.

regards

Sunita


On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind <sunitarv...@gmail.com>
wrote:

> The error in the file I just shared is here:
>
> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + 
> partition._2(0);  --> this was just partition and hence there was an error
>
> fetching the offset.
>
> Still testing. Somehow Cody, your code never lead to file already exists sort 
> of errors (I am saving the output of the dstream
> as parquet file, after converting it to a dataframe. The batch interval will 
> be 2 hrs)
>
> The code in the main is here:
>
>   val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
> conf.getString("groupId"), conf.getString("topics"))
>    val storedOffsets = offsetsStore.readOffsets()
>  LogHandler.log.info("Fetched the offset from zookeeper")
>
>  val kafkaArr =  storedOffsets match {
>    case None =>
>      // start from the initial offsets
>      
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaProps, Set(topics))
>
>    case Some(fromOffsets) =>
>      // start from previously saved offsets
>      val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, 
> Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, 
> mmd.message)
>      
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String,
>  Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)
>
>      //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage, 
> (String, Row)](sc, kafkaProps, fromOffsets, messageHandler)
>  }
>
>  kafkaArr.foreachRDD{ (rdd,time) =>
>
>         val schema = 
> SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
>         val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r => 
> Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray))
>         }
>         val df = sql.createDataFrame(ardd,schema)
>    LogHandler.log.info("Created dataframe")
>    val offsetSaved =  
> offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")
>    LogHandler.log.info("Saved offset to Zookeeper")
>    df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved)
>    LogHandler.log.info("Created the parquet file")
>  }
>
> Thanks
>
> Sunita
>
>
>
>
>
> On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com>
> wrote:
>
>> Attached is the edited code. Am I heading in right direction? Also, I am
>> missing something due to which, it seems to work well as long as the
>> application is running and the files are created right. But as soon as I
>> restart the application, it goes back to fromOffset as 0. Any thoughts?
>>
>> regards
>> Sunita
>>
>> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com>
>> wrote:
>>
>>> Thanks for confirming Cody.
>>> To get to use the library, I had to do:
>>>
>>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), 
>>> "/consumers/topics/"+ topics + "/0")
>>>
>>> It worked well. However, I had to specify the partitionId in the
>>> zkPath.  If I want the library to pick all the partitions for a topic,
>>> without me specifying the path, is it possible out of the box or I need to
>>> tweak?
>>>
>>> regards
>>> Sunita
>>>
>>>
>>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> You are correct that you shouldn't have to worry about broker id.
>>>>
>>>> I'm honestly not sure specifically what else you are asking at this
>>>> point.
>>>>
>>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com>
>>>> wrote:
>>>> > Just re-read the kafka architecture. Something that slipped my mind
>>>> is, it
>>>> > is leader based. So topic/partitionId pair will be same on all the
>>>> brokers.
>>>> > So we do not need to consider brokerid while storing offsets. Still
>>>> > exploring rest of the items.
>>>> > regards
>>>> > Sunita
>>>> >
>>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind <
>>>> sunitarv...@gmail.com>
>>>> > wrote:
>>>> >>
>>>> >> Hello Experts,
>>>> >>
>>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's
>>>> comments
>>>> >> that it is old approach. Any reasons for that? Any issues observed
>>>> with
>>>> >> saving to ZK. The way we are planning to use it is:
>>>> >> 1. Following
>>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>>> g-zero-data-loss.html
>>>> >> 2. Saving to the same file with offsetRange as a part of the file.
>>>> We hope
>>>> >> that there are no partial writes/ overwriting is possible and
>>>> offsetRanges
>>>> >>
>>>> >> However I have below doubts which I couldnt figure out from the code
>>>> here
>>>> >> -
>>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/
>>>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala
>>>> >> 1. The brokerId is not part of the OffsetRange. How will just the
>>>> >> partitionId:FromOffset stay unique in a cluster with multiple
>>>> brokers and
>>>> >> multiple partitions/topic.
>>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried
>>>> using
>>>> >> the ZookeeperOffsetStore as is and it required me to specify the
>>>> >> partitionId:
>>>> >>
>>>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get
>>>> String("zkHosts"),
>>>> >> "/consumers/topics/"+ topics + "/0")
>>>> >>
>>>> >> For our usecases it is too limiting to include partitionId in the
>>>> path.
>>>> >> To get it to work by automatically detecting the existing partitions
>>>> for a
>>>> >> given topic, I changed it as below (inspired from
>>>> >> http://www.programcreek.com/java-api-examples/index.php?api=
>>>> kafka.utils.ZKGroupTopicDirs):
>>>> >>
>>>> >> /**
>>>> >>   * zkServers Zookeeper server string: host1:port1[,host2:port2,...]
>>>> >>   * groupID consumer group to get offsets for
>>>> >>   * topic topic to get offsets for
>>>> >>   * return - mapping of (topic and) partition to offset
>>>> >>   */
>>>> >> private def getOffsets(groupID :String, topic:
>>>> String):Option[String] = {
>>>> >>   val topicDirs = new ZKGroupTopicDirs(groupID, topic)
>>>> >>   val offsets = new mutable.HashMap[TopicAndPartition,Long]()
>>>> >>   val topicSeq = List(topic).toSeq
>>>> >>  // try {
>>>> >>     val partitions = ZkUtils.getPartitionsForTopics(zkClient,
>>>> topicSeq)
>>>> >>     var partition:Object=null
>>>> >>     for (partition <- partitions) {
>>>> >>       val partitionOffsetPath:String = topicDirs.consumerOffsetDir +
>>>> "/" +
>>>> >> partition;
>>>> >>       val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl
>>>> ient,
>>>> >> partitionOffsetPath)._1;
>>>> >>       val offset:Long = if(maybeOffset.isDefined)
>>>> maybeOffset.get.toLong
>>>> >> else 0L;
>>>> >>       val topicAndPartition:TopicAndPartition  = new
>>>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString));
>>>> >>       offsets.put(topicAndPartition, offset)
>>>> >>     }
>>>> >>   //}
>>>> >> Option(offsets.mkString(","))
>>>> >> }
>>>> >>
>>>> >> // Read the previously saved offsets from Zookeeper
>>>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
>>>> >>
>>>> >>   LogHandler.log.info("Reading offsets from ZooKeeper")
>>>> >>
>>>> >>   val offsetsRangesStrOpt = getOffsets(consumerGrp,topic)
>>>> >>   val start = System.currentTimeMillis()
>>>> >>   offsetsRangesStrOpt match {
>>>> >>     case Some(offsetsRangesStr) =>
>>>> >>       LogHandler.log.debug(s"Read offset ranges:
>>>> ${offsetsRangesStr}")
>>>> >>
>>>> >>       val offsets = offsetsRangesStr.split(",")
>>>> >>         .map(s => s.split(":"))
>>>> >>         .map { case Array(partitionStr, offsetStr) =>
>>>> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }
>>>> >>         .toMap
>>>> >>
>>>> >>       LogHandler.log.info("Done reading offsets from ZooKeeper.
>>>> Took " +
>>>> >> (System.currentTimeMillis() - start))
>>>> >>
>>>> >>       Some(offsets)
>>>> >>     case None =>
>>>> >>       LogHandler.log.info("No offsets found in ZooKeeper. Took " +
>>>> >> (System.currentTimeMillis() - start))
>>>> >>       None
>>>> >>   }
>>>> >>
>>>> >> }
>>>> >>
>>>> >> However, I am concerned if the saveOffsets will work well with this
>>>> >> approach. Thats when I realized we are not considering brokerIds
>>>> which
>>>> >> storing offsets and probably the OffsetRanges does not have it
>>>> either. It
>>>> >> can only provide Topic, partition, from and until offsets.
>>>> >>
>>>> >> I am probably missing something very basic. Probably the library
>>>> works
>>>> >> well by itself. Can someone/ Cody explain?
>>>> >>
>>>> >> Cody, Thanks a lot for sharing your work.
>>>> >>
>>>> >> regards
>>>> >> Sunita
>>>> >>
>>>> >>
>>>> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org
>>>> >
>>>> >> wrote:
>>>> >>>
>>>> >>> See
>>>> >>> https://github.com/koeninger/kafka-exactly-once
>>>> >>>
>>>> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed"
>>>> >>> <mdkhajaasm...@gmail.com> wrote:
>>>> >>>>
>>>> >>>> Hi Experts,
>>>> >>>>
>>>> >>>> I am looking for some information on how to acheive zero data loss
>>>> while
>>>> >>>> working with kafka and Spark. I have searched online and blogs have
>>>> >>>> different answer. Please let me know if anyone has idea on this.
>>>> >>>>
>>>> >>>> Blog 1:
>>>> >>>>
>>>> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault
>>>> -tolerance-and-zero-data-loss-in-spark-streaming.html
>>>> >>>>
>>>> >>>>
>>>> >>>> Blog2:
>>>> >>>>
>>>> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin
>>>> g-zero-data-loss.html
>>>> >>>>
>>>> >>>>
>>>> >>>> Blog one simply says configuration change with checkpoint
>>>> directory and
>>>> >>>> blog 2 give details about on how to save offsets to zoo keeper.
>>>> can you
>>>> >>>> please help me out with right approach.
>>>> >>>>
>>>> >>>> Thanks,
>>>> >>>> Asmath
>>>> >>>>
>>>> >>>>
>>>> >>
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to