Thanks for quick reply. I am creating Kafka Dstream by passing offsets map. I have pasted code snippet in my earlier mail. Let me know am I missing something.
I want to use spark checkpoint for hand ng only driver/executor failures. On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote: > Offsets are stored in the checkpoint. If you want to manage offsets > yourself, don't restart from the checkpoint, specify the starting offsets > when you create the stream. > > Have you read / watched the materials linked from > > https://github.com/koeninger/kafka-exactly-once > > Regarding the small files problem, either don't use HDFS, or use something > like filecrush for merging. > > On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org> wrote: > >> Hi, >> >> >> I am very new to spark & spark-streaming. I am planning to use spark >> streaming for real time processing. >> >> I have created a streaming context and checkpointing to hdfs directory >> for recovery purposes in case of executor failures & driver failures. >> >> I am creating Dstream with offset map for getting the data from kafka. I >> am simply ignoring the offsets to understand the behavior. Whenver I >> restart application driver restored from checkpoint as expected but Dstream >> is not getting started from the initial offsets. Dstream was created with >> the last consumed offsets instead of startign from 0 offsets for each topic >> partition as I am not storing the offsets any where. >> >> def main : Unit = { >> >> var sparkStreamingContext = >> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, >> () => creatingFunc()) >> >> ... >> >> >> } >> >> def creatingFunc(): Unit = { >> >> ... >> >> var offsets:Map[TopicAndPartition, Long] = >> Map(TopicAndPartition("sample_sample3_json",0) -> 0) >> >> KafkaUtils.createDirectStream[String,String, StringDecoder, >> StringDecoder, >> String](sparkStreamingContext, kafkaParams, offsets, messageHandler) >> >> ... >> } >> >> I want to get control over offset management at event level instead of >> RDD level to make sure that at least once delivery to end system. >> >> As per my understanding, every RDD or RDD partition will stored in hdfs >> as a file If I choose to use HDFS as output. If I use 1sec as batch >> interval then it will be ended up having huge number of small files in >> HDFS. Having small files in HDFS will leads to lots of other issues. >> Is there any way to write multiple RDDs into single file? Don't have muh >> idea about *coalesce* usage. In the worst case, I can merge all small files >> in HDFS in regular intervals. >> >> Thanks... >> >> ------ >> Thanks >> Raju Bairishetti >> www.lazada.com >> >> >> >> >