For specific offsets you can directly pass the offset ranges and use the KafkaUtils. createRDD to get the events those were missed in the Dstream.
- Thanks, via mobile, excuse brevity. On Jan 25, 2016 3:33 PM, "Raju Bairishetti" <r...@apache.org> wrote: > Hi Yash, > Basically, my question is how to avoid storing the kafka offsets in > spark checkpoint directory. Streaming context is getting build from > checkpoint directory and proceeding with the offsets in checkpointed RDD. > > I want to consume data from kafka from specific offsets along with the > spark checkpoints. Streaming context is getting prepared from the > checkpoint directory and started consuming from the topic offsets which > were stored in checkpoint directory. > > > On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma <yash...@gmail.com> wrote: > >> Hi Raju, >> Could you please explain your expected behavior with the DStream. The >> DStream will have event only from the 'fromOffsets' that you provided in >> the createDirectStream (which I think is the expected behavior). >> >> For the smaller files, you will have to deal with smaller files if you >> intend to write it immediately. Alternately what we do sometimes is- >> >> 1. Maintain couple of iterations for some 30-40 seconds in application >> until we have substantial data and then we write them to disk. >> 2. Push smaller data back to kafka, and a different job handles the save >> to disk. >> >> On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <r...@apache.org> >> wrote: >> >>> Thanks for quick reply. >>> I am creating Kafka Dstream by passing offsets map. I have pasted code >>> snippet in my earlier mail. Let me know am I missing something. >>> >>> I want to use spark checkpoint for hand ng only driver/executor >>> failures. >>> On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote: >>> >>>> Offsets are stored in the checkpoint. If you want to manage offsets >>>> yourself, don't restart from the checkpoint, specify the starting offsets >>>> when you create the stream. >>>> >>>> Have you read / watched the materials linked from >>>> >>>> https://github.com/koeninger/kafka-exactly-once >>>> >>>> Regarding the small files problem, either don't use HDFS, or use >>>> something like filecrush for merging. >>>> >>>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> >>>>> I am very new to spark & spark-streaming. I am planning to use >>>>> spark streaming for real time processing. >>>>> >>>>> I have created a streaming context and checkpointing to hdfs >>>>> directory for recovery purposes in case of executor failures & driver >>>>> failures. >>>>> >>>>> I am creating Dstream with offset map for getting the data from kafka. >>>>> I am simply ignoring the offsets to understand the behavior. Whenver I >>>>> restart application driver restored from checkpoint as expected but >>>>> Dstream >>>>> is not getting started from the initial offsets. Dstream was created with >>>>> the last consumed offsets instead of startign from 0 offsets for each >>>>> topic >>>>> partition as I am not storing the offsets any where. >>>>> >>>>> def main : Unit = { >>>>> >>>>> var sparkStreamingContext = >>>>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, >>>>> () => creatingFunc()) >>>>> >>>>> ... >>>>> >>>>> >>>>> } >>>>> >>>>> def creatingFunc(): Unit = { >>>>> >>>>> ... >>>>> >>>>> var offsets:Map[TopicAndPartition, Long] = >>>>> Map(TopicAndPartition("sample_sample3_json",0) -> 0) >>>>> >>>>> KafkaUtils.createDirectStream[String,String, StringDecoder, >>>>> StringDecoder, >>>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler) >>>>> >>>>> ... >>>>> } >>>>> >>>>> I want to get control over offset management at event level instead of >>>>> RDD level to make sure that at least once delivery to end system. >>>>> >>>>> As per my understanding, every RDD or RDD partition will stored in >>>>> hdfs as a file If I choose to use HDFS as output. If I use 1sec as batch >>>>> interval then it will be ended up having huge number of small files in >>>>> HDFS. Having small files in HDFS will leads to lots of other issues. >>>>> Is there any way to write multiple RDDs into single file? Don't have >>>>> muh idea about *coalesce* usage. In the worst case, I can merge all small >>>>> files in HDFS in regular intervals. >>>>> >>>>> Thanks... >>>>> >>>>> ------ >>>>> Thanks >>>>> Raju Bairishetti >>>>> www.lazada.com >>>>> >>>>> >>>>> >>>>> >>>> >> > > > -- > > ------ > Thanks > Raju Bairishetti > www.lazada.com >