This is enough to get it to work: df.save(conf.getString("ParquetOutputPath")+offsetSaved, "parquet", SaveMode.Overwrite)
And tests so far (in local env) seem good with the edits. Yet to test on the cluster. Cody, appreciate your thoughts on the edits. Just want to make sure I am not doing an overkill or overseeing a potential issue. regards Sunita On Tue, Oct 25, 2016 at 2:38 PM, Sunita Arvind <sunitarv...@gmail.com> wrote: > The error in the file I just shared is here: > > val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + > partition._2(0); --> this was just partition and hence there was an error > > fetching the offset. > > Still testing. Somehow Cody, your code never lead to file already exists sort > of errors (I am saving the output of the dstream > as parquet file, after converting it to a dataframe. The batch interval will > be 2 hrs) > > The code in the main is here: > > val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), > conf.getString("groupId"), conf.getString("topics")) > val storedOffsets = offsetsStore.readOffsets() > LogHandler.log.info("Fetched the offset from zookeeper") > > val kafkaArr = storedOffsets match { > case None => > // start from the initial offsets > > KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, > kafkaProps, Set(topics)) > > case Some(fromOffsets) => > // start from previously saved offsets > val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, > Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, > mmd.message) > > KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, > Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler) > > //KafkaUtils.createRDD[String,Row,StringDecoder,ProtobufMessage, > (String, Row)](sc, kafkaProps, fromOffsets, messageHandler) > } > > kafkaArr.foreachRDD{ (rdd,time) => > > val schema = > SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType] > val ardd:RDD[Row] = rdd.mapPartitions{itr => itr.map(r => > Row.fromSeq(AvroUtils.avroToList(AvrodataUtils.getAvroData(r._2)).toArray)) > } > val df = sql.createDataFrame(ardd,schema) > LogHandler.log.info("Created dataframe") > val offsetSaved = > offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_") > LogHandler.log.info("Saved offset to Zookeeper") > df.saveAsParquetFile(conf.getString("ParquetOutputPath")+offsetSaved) > LogHandler.log.info("Created the parquet file") > } > > Thanks > > Sunita > > > > > > On Tue, Oct 25, 2016 at 2:11 PM, Sunita Arvind <sunitarv...@gmail.com> > wrote: > >> Attached is the edited code. Am I heading in right direction? Also, I am >> missing something due to which, it seems to work well as long as the >> application is running and the files are created right. But as soon as I >> restart the application, it goes back to fromOffset as 0. Any thoughts? >> >> regards >> Sunita >> >> On Tue, Oct 25, 2016 at 1:52 PM, Sunita Arvind <sunitarv...@gmail.com> >> wrote: >> >>> Thanks for confirming Cody. >>> To get to use the library, I had to do: >>> >>> val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), >>> "/consumers/topics/"+ topics + "/0") >>> >>> It worked well. However, I had to specify the partitionId in the >>> zkPath. If I want the library to pick all the partitions for a topic, >>> without me specifying the path, is it possible out of the box or I need to >>> tweak? >>> >>> regards >>> Sunita >>> >>> >>> On Tue, Oct 25, 2016 at 12:08 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> You are correct that you shouldn't have to worry about broker id. >>>> >>>> I'm honestly not sure specifically what else you are asking at this >>>> point. >>>> >>>> On Tue, Oct 25, 2016 at 1:39 PM, Sunita Arvind <sunitarv...@gmail.com> >>>> wrote: >>>> > Just re-read the kafka architecture. Something that slipped my mind >>>> is, it >>>> > is leader based. So topic/partitionId pair will be same on all the >>>> brokers. >>>> > So we do not need to consider brokerid while storing offsets. Still >>>> > exploring rest of the items. >>>> > regards >>>> > Sunita >>>> > >>>> > On Tue, Oct 25, 2016 at 11:09 AM, Sunita Arvind < >>>> sunitarv...@gmail.com> >>>> > wrote: >>>> >> >>>> >> Hello Experts, >>>> >> >>>> >> I am trying to use the saving to ZK design. Just saw Sudhir's >>>> comments >>>> >> that it is old approach. Any reasons for that? Any issues observed >>>> with >>>> >> saving to ZK. The way we are planning to use it is: >>>> >> 1. Following >>>> >> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin >>>> g-zero-data-loss.html >>>> >> 2. Saving to the same file with offsetRange as a part of the file. >>>> We hope >>>> >> that there are no partial writes/ overwriting is possible and >>>> offsetRanges >>>> >> >>>> >> However I have below doubts which I couldnt figure out from the code >>>> here >>>> >> - >>>> >> https://github.com/ippontech/spark-kafka-source/blob/master/ >>>> src/main/scala/com/ippontech/kafka/stores/ZooKeeperOffsetsStore.scala >>>> >> 1. The brokerId is not part of the OffsetRange. How will just the >>>> >> partitionId:FromOffset stay unique in a cluster with multiple >>>> brokers and >>>> >> multiple partitions/topic. >>>> >> 2. Do we have to specify zkPath to include the partitionid. I tried >>>> using >>>> >> the ZookeeperOffsetStore as is and it required me to specify the >>>> >> partitionId: >>>> >> >>>> >> val offsetsStore = new ZooKeeperOffsetsStore(conf.get >>>> String("zkHosts"), >>>> >> "/consumers/topics/"+ topics + "/0") >>>> >> >>>> >> For our usecases it is too limiting to include partitionId in the >>>> path. >>>> >> To get it to work by automatically detecting the existing partitions >>>> for a >>>> >> given topic, I changed it as below (inspired from >>>> >> http://www.programcreek.com/java-api-examples/index.php?api= >>>> kafka.utils.ZKGroupTopicDirs): >>>> >> >>>> >> /** >>>> >> * zkServers Zookeeper server string: host1:port1[,host2:port2,...] >>>> >> * groupID consumer group to get offsets for >>>> >> * topic topic to get offsets for >>>> >> * return - mapping of (topic and) partition to offset >>>> >> */ >>>> >> private def getOffsets(groupID :String, topic: >>>> String):Option[String] = { >>>> >> val topicDirs = new ZKGroupTopicDirs(groupID, topic) >>>> >> val offsets = new mutable.HashMap[TopicAndPartition,Long]() >>>> >> val topicSeq = List(topic).toSeq >>>> >> // try { >>>> >> val partitions = ZkUtils.getPartitionsForTopics(zkClient, >>>> topicSeq) >>>> >> var partition:Object=null >>>> >> for (partition <- partitions) { >>>> >> val partitionOffsetPath:String = topicDirs.consumerOffsetDir + >>>> "/" + >>>> >> partition; >>>> >> val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkCl >>>> ient, >>>> >> partitionOffsetPath)._1; >>>> >> val offset:Long = if(maybeOffset.isDefined) >>>> maybeOffset.get.toLong >>>> >> else 0L; >>>> >> val topicAndPartition:TopicAndPartition = new >>>> >> TopicAndPartition(topic, Integer.parseInt(partition.toString)); >>>> >> offsets.put(topicAndPartition, offset) >>>> >> } >>>> >> //} >>>> >> Option(offsets.mkString(",")) >>>> >> } >>>> >> >>>> >> // Read the previously saved offsets from Zookeeper >>>> >> override def readOffsets: Option[Map[TopicAndPartition, Long]] = { >>>> >> >>>> >> LogHandler.log.info("Reading offsets from ZooKeeper") >>>> >> >>>> >> val offsetsRangesStrOpt = getOffsets(consumerGrp,topic) >>>> >> val start = System.currentTimeMillis() >>>> >> offsetsRangesStrOpt match { >>>> >> case Some(offsetsRangesStr) => >>>> >> LogHandler.log.debug(s"Read offset ranges: >>>> ${offsetsRangesStr}") >>>> >> >>>> >> val offsets = offsetsRangesStr.split(",") >>>> >> .map(s => s.split(":")) >>>> >> .map { case Array(partitionStr, offsetStr) => >>>> >> (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) } >>>> >> .toMap >>>> >> >>>> >> LogHandler.log.info("Done reading offsets from ZooKeeper. >>>> Took " + >>>> >> (System.currentTimeMillis() - start)) >>>> >> >>>> >> Some(offsets) >>>> >> case None => >>>> >> LogHandler.log.info("No offsets found in ZooKeeper. Took " + >>>> >> (System.currentTimeMillis() - start)) >>>> >> None >>>> >> } >>>> >> >>>> >> } >>>> >> >>>> >> However, I am concerned if the saveOffsets will work well with this >>>> >> approach. Thats when I realized we are not considering brokerIds >>>> which >>>> >> storing offsets and probably the OffsetRanges does not have it >>>> either. It >>>> >> can only provide Topic, partition, from and until offsets. >>>> >> >>>> >> I am probably missing something very basic. Probably the library >>>> works >>>> >> well by itself. Can someone/ Cody explain? >>>> >> >>>> >> Cody, Thanks a lot for sharing your work. >>>> >> >>>> >> regards >>>> >> Sunita >>>> >> >>>> >> >>>> >> On Tue, Aug 23, 2016 at 11:21 AM, Cody Koeninger <c...@koeninger.org >>>> > >>>> >> wrote: >>>> >>> >>>> >>> See >>>> >>> https://github.com/koeninger/kafka-exactly-once >>>> >>> >>>> >>> On Aug 23, 2016 10:30 AM, "KhajaAsmath Mohammed" >>>> >>> <mdkhajaasm...@gmail.com> wrote: >>>> >>>> >>>> >>>> Hi Experts, >>>> >>>> >>>> >>>> I am looking for some information on how to acheive zero data loss >>>> while >>>> >>>> working with kafka and Spark. I have searched online and blogs have >>>> >>>> different answer. Please let me know if anyone has idea on this. >>>> >>>> >>>> >>>> Blog 1: >>>> >>>> >>>> >>>> https://databricks.com/blog/2015/01/15/improved-driver-fault >>>> -tolerance-and-zero-data-loss-in-spark-streaming.html >>>> >>>> >>>> >>>> >>>> >>>> Blog2: >>>> >>>> >>>> >>>> http://aseigneurin.github.io/2016/05/07/spark-kafka-achievin >>>> g-zero-data-loss.html >>>> >>>> >>>> >>>> >>>> >>>> Blog one simply says configuration change with checkpoint >>>> directory and >>>> >>>> blog 2 give details about on how to save offsets to zoo keeper. >>>> can you >>>> >>>> please help me out with right approach. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Asmath >>>> >>>> >>>> >>>> >>>> >> >>>> > >>>> >>> >>> >> >