Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint
For specific offsets you can directly pass the offset ranges and use the KafkaUtils. createRDD to get the events those were missed in the Dstream. - Thanks, via mobile, excuse brevity. On Jan 25, 2016 3:33 PM, "Raju Bairishetti" wrote: > Hi Yash, >Basically, my question is how to avoid storing the kafka offsets in > spark checkpoint directory. Streaming context is getting build from > checkpoint directory and proceeding with the offsets in checkpointed RDD. > > I want to consume data from kafka from specific offsets along with the > spark checkpoints. Streaming context is getting prepared from the > checkpoint directory and started consuming from the topic offsets which > were stored in checkpoint directory. > > > On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma wrote: > >> Hi Raju, >> Could you please explain your expected behavior with the DStream. The >> DStream will have event only from the 'fromOffsets' that you provided in >> the createDirectStream (which I think is the expected behavior). >> >> For the smaller files, you will have to deal with smaller files if you >> intend to write it immediately. Alternately what we do sometimes is- >> >> 1. Maintain couple of iterations for some 30-40 seconds in application >> until we have substantial data and then we write them to disk. >> 2. Push smaller data back to kafka, and a different job handles the save >> to disk. >> >> On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti >> wrote: >> >>> Thanks for quick reply. >>> I am creating Kafka Dstream by passing offsets map. I have pasted code >>> snippet in my earlier mail. Let me know am I missing something. >>> >>> I want to use spark checkpoint for hand ng only driver/executor >>> failures. >>> On Jan 22, 2016 10:08 PM, "Cody Koeninger" wrote: >>> Offsets are stored in the checkpoint. If you want to manage offsets yourself, don't restart from the checkpoint, specify the starting offsets when you create the stream. Have you read / watched the materials linked from https://github.com/koeninger/kafka-exactly-once Regarding the small files problem, either don't use HDFS, or use something like filecrush for merging. On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti wrote: > Hi, > > >I am very new to spark & spark-streaming. I am planning to use > spark streaming for real time processing. > >I have created a streaming context and checkpointing to hdfs > directory for recovery purposes in case of executor failures & driver > failures. > > I am creating Dstream with offset map for getting the data from kafka. > I am simply ignoring the offsets to understand the behavior. Whenver I > restart application driver restored from checkpoint as expected but > Dstream > is not getting started from the initial offsets. Dstream was created with > the last consumed offsets instead of startign from 0 offsets for each > topic > partition as I am not storing the offsets any where. > > def main : Unit = { > > var sparkStreamingContext = > StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, > () => creatingFunc()) > > ... > > > } > > def creatingFunc(): Unit = { > > ... > > var offsets:Map[TopicAndPartition, Long] = > Map(TopicAndPartition("sample_sample3_json",0) -> 0) > > KafkaUtils.createDirectStream[String,String, StringDecoder, > StringDecoder, > String](sparkStreamingContext, kafkaParams, offsets, messageHandler) > > ... > } > > I want to get control over offset management at event level instead of > RDD level to make sure that at least once delivery to end system. > > As per my understanding, every RDD or RDD partition will stored in > hdfs as a file If I choose to use HDFS as output. If I use 1sec as batch > interval then it will be ended up having huge number of small files in > HDFS. Having small files in HDFS will leads to lots of other issues. > Is there any way to write multiple RDDs into single file? Don't have > muh idea about *coalesce* usage. In the worst case, I can merge all small > files in HDFS in regular intervals. > > Thanks... > > -- > Thanks > Raju Bairishetti > www.lazada.com > > > > >> > > > -- > > -- > Thanks > Raju Bairishetti > www.lazada.com >
Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint
Hi Yash, Basically, my question is how to avoid storing the kafka offsets in spark checkpoint directory. Streaming context is getting build from checkpoint directory and proceeding with the offsets in checkpointed RDD. I want to consume data from kafka from specific offsets along with the spark checkpoints. Streaming context is getting prepared from the checkpoint directory and started consuming from the topic offsets which were stored in checkpoint directory. On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma wrote: > Hi Raju, > Could you please explain your expected behavior with the DStream. The > DStream will have event only from the 'fromOffsets' that you provided in > the createDirectStream (which I think is the expected behavior). > > For the smaller files, you will have to deal with smaller files if you > intend to write it immediately. Alternately what we do sometimes is- > > 1. Maintain couple of iterations for some 30-40 seconds in application > until we have substantial data and then we write them to disk. > 2. Push smaller data back to kafka, and a different job handles the save > to disk. > > On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti wrote: > >> Thanks for quick reply. >> I am creating Kafka Dstream by passing offsets map. I have pasted code >> snippet in my earlier mail. Let me know am I missing something. >> >> I want to use spark checkpoint for hand ng only driver/executor failures. >> On Jan 22, 2016 10:08 PM, "Cody Koeninger" wrote: >> >>> Offsets are stored in the checkpoint. If you want to manage offsets >>> yourself, don't restart from the checkpoint, specify the starting offsets >>> when you create the stream. >>> >>> Have you read / watched the materials linked from >>> >>> https://github.com/koeninger/kafka-exactly-once >>> >>> Regarding the small files problem, either don't use HDFS, or use >>> something like filecrush for merging. >>> >>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti >>> wrote: >>> Hi, I am very new to spark & spark-streaming. I am planning to use spark streaming for real time processing. I have created a streaming context and checkpointing to hdfs directory for recovery purposes in case of executor failures & driver failures. I am creating Dstream with offset map for getting the data from kafka. I am simply ignoring the offsets to understand the behavior. Whenver I restart application driver restored from checkpoint as expected but Dstream is not getting started from the initial offsets. Dstream was created with the last consumed offsets instead of startign from 0 offsets for each topic partition as I am not storing the offsets any where. def main : Unit = { var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, () => creatingFunc()) ... } def creatingFunc(): Unit = { ... var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0) KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder, String](sparkStreamingContext, kafkaParams, offsets, messageHandler) ... } I want to get control over offset management at event level instead of RDD level to make sure that at least once delivery to end system. As per my understanding, every RDD or RDD partition will stored in hdfs as a file If I choose to use HDFS as output. If I use 1sec as batch interval then it will be ended up having huge number of small files in HDFS. Having small files in HDFS will leads to lots of other issues. Is there any way to write multiple RDDs into single file? Don't have muh idea about *coalesce* usage. In the worst case, I can merge all small files in HDFS in regular intervals. Thanks... -- Thanks Raju Bairishetti www.lazada.com >>> > -- -- Thanks Raju Bairishetti www.lazada.com
Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint
Hi Raju, Could you please explain your expected behavior with the DStream. The DStream will have event only from the 'fromOffsets' that you provided in the createDirectStream (which I think is the expected behavior). For the smaller files, you will have to deal with smaller files if you intend to write it immediately. Alternately what we do sometimes is- 1. Maintain couple of iterations for some 30-40 seconds in application until we have substantial data and then we write them to disk. 2. Push smaller data back to kafka, and a different job handles the save to disk. On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti wrote: > Thanks for quick reply. > I am creating Kafka Dstream by passing offsets map. I have pasted code > snippet in my earlier mail. Let me know am I missing something. > > I want to use spark checkpoint for hand ng only driver/executor failures. > On Jan 22, 2016 10:08 PM, "Cody Koeninger" wrote: > >> Offsets are stored in the checkpoint. If you want to manage offsets >> yourself, don't restart from the checkpoint, specify the starting offsets >> when you create the stream. >> >> Have you read / watched the materials linked from >> >> https://github.com/koeninger/kafka-exactly-once >> >> Regarding the small files problem, either don't use HDFS, or use >> something like filecrush for merging. >> >> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti >> wrote: >> >>> Hi, >>> >>> >>>I am very new to spark & spark-streaming. I am planning to use spark >>> streaming for real time processing. >>> >>>I have created a streaming context and checkpointing to hdfs >>> directory for recovery purposes in case of executor failures & driver >>> failures. >>> >>> I am creating Dstream with offset map for getting the data from kafka. I >>> am simply ignoring the offsets to understand the behavior. Whenver I >>> restart application driver restored from checkpoint as expected but Dstream >>> is not getting started from the initial offsets. Dstream was created with >>> the last consumed offsets instead of startign from 0 offsets for each topic >>> partition as I am not storing the offsets any where. >>> >>> def main : Unit = { >>> >>> var sparkStreamingContext = >>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, >>> () => creatingFunc()) >>> >>> ... >>> >>> >>> } >>> >>> def creatingFunc(): Unit = { >>> >>> ... >>> >>> var offsets:Map[TopicAndPartition, Long] = >>> Map(TopicAndPartition("sample_sample3_json",0) -> 0) >>> >>> KafkaUtils.createDirectStream[String,String, StringDecoder, >>> StringDecoder, >>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler) >>> >>> ... >>> } >>> >>> I want to get control over offset management at event level instead of >>> RDD level to make sure that at least once delivery to end system. >>> >>> As per my understanding, every RDD or RDD partition will stored in hdfs >>> as a file If I choose to use HDFS as output. If I use 1sec as batch >>> interval then it will be ended up having huge number of small files in >>> HDFS. Having small files in HDFS will leads to lots of other issues. >>> Is there any way to write multiple RDDs into single file? Don't have muh >>> idea about *coalesce* usage. In the worst case, I can merge all small files >>> in HDFS in regular intervals. >>> >>> Thanks... >>> >>> -- >>> Thanks >>> Raju Bairishetti >>> www.lazada.com >>> >>> >>> >>> >>
Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint
Thanks for quick reply. I am creating Kafka Dstream by passing offsets map. I have pasted code snippet in my earlier mail. Let me know am I missing something. I want to use spark checkpoint for hand ng only driver/executor failures. On Jan 22, 2016 10:08 PM, "Cody Koeninger" wrote: > Offsets are stored in the checkpoint. If you want to manage offsets > yourself, don't restart from the checkpoint, specify the starting offsets > when you create the stream. > > Have you read / watched the materials linked from > > https://github.com/koeninger/kafka-exactly-once > > Regarding the small files problem, either don't use HDFS, or use something > like filecrush for merging. > > On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti wrote: > >> Hi, >> >> >>I am very new to spark & spark-streaming. I am planning to use spark >> streaming for real time processing. >> >>I have created a streaming context and checkpointing to hdfs directory >> for recovery purposes in case of executor failures & driver failures. >> >> I am creating Dstream with offset map for getting the data from kafka. I >> am simply ignoring the offsets to understand the behavior. Whenver I >> restart application driver restored from checkpoint as expected but Dstream >> is not getting started from the initial offsets. Dstream was created with >> the last consumed offsets instead of startign from 0 offsets for each topic >> partition as I am not storing the offsets any where. >> >> def main : Unit = { >> >> var sparkStreamingContext = >> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, >> () => creatingFunc()) >> >> ... >> >> >> } >> >> def creatingFunc(): Unit = { >> >> ... >> >> var offsets:Map[TopicAndPartition, Long] = >> Map(TopicAndPartition("sample_sample3_json",0) -> 0) >> >> KafkaUtils.createDirectStream[String,String, StringDecoder, >> StringDecoder, >> String](sparkStreamingContext, kafkaParams, offsets, messageHandler) >> >> ... >> } >> >> I want to get control over offset management at event level instead of >> RDD level to make sure that at least once delivery to end system. >> >> As per my understanding, every RDD or RDD partition will stored in hdfs >> as a file If I choose to use HDFS as output. If I use 1sec as batch >> interval then it will be ended up having huge number of small files in >> HDFS. Having small files in HDFS will leads to lots of other issues. >> Is there any way to write multiple RDDs into single file? Don't have muh >> idea about *coalesce* usage. In the worst case, I can merge all small files >> in HDFS in regular intervals. >> >> Thanks... >> >> -- >> Thanks >> Raju Bairishetti >> www.lazada.com >> >> >> >> >
Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint
Offsets are stored in the checkpoint. If you want to manage offsets yourself, don't restart from the checkpoint, specify the starting offsets when you create the stream. Have you read / watched the materials linked from https://github.com/koeninger/kafka-exactly-once Regarding the small files problem, either don't use HDFS, or use something like filecrush for merging. On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti wrote: > Hi, > > >I am very new to spark & spark-streaming. I am planning to use spark > streaming for real time processing. > >I have created a streaming context and checkpointing to hdfs directory > for recovery purposes in case of executor failures & driver failures. > > I am creating Dstream with offset map for getting the data from kafka. I > am simply ignoring the offsets to understand the behavior. Whenver I > restart application driver restored from checkpoint as expected but Dstream > is not getting started from the initial offsets. Dstream was created with > the last consumed offsets instead of startign from 0 offsets for each topic > partition as I am not storing the offsets any where. > > def main : Unit = { > > var sparkStreamingContext = > StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, > () => creatingFunc()) > > ... > > > } > > def creatingFunc(): Unit = { > > ... > > var offsets:Map[TopicAndPartition, Long] = > Map(TopicAndPartition("sample_sample3_json",0) -> 0) > > KafkaUtils.createDirectStream[String,String, StringDecoder, > StringDecoder, > String](sparkStreamingContext, kafkaParams, offsets, messageHandler) > > ... > } > > I want to get control over offset management at event level instead of RDD > level to make sure that at least once delivery to end system. > > As per my understanding, every RDD or RDD partition will stored in hdfs as > a file If I choose to use HDFS as output. If I use 1sec as batch interval > then it will be ended up having huge number of small files in HDFS. Having > small files in HDFS will leads to lots of other issues. > Is there any way to write multiple RDDs into single file? Don't have muh > idea about *coalesce* usage. In the worst case, I can merge all small files > in HDFS in regular intervals. > > Thanks... > > -- > Thanks > Raju Bairishetti > www.lazada.com > > > >
[Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint
Hi, I am very new to spark & spark-streaming. I am planning to use spark streaming for real time processing. I have created a streaming context and checkpointing to hdfs directory for recovery purposes in case of executor failures & driver failures. I am creating Dstream with offset map for getting the data from kafka. I am simply ignoring the offsets to understand the behavior. Whenver I restart application driver restored from checkpoint as expected but Dstream is not getting started from the initial offsets. Dstream was created with the last consumed offsets instead of startign from 0 offsets for each topic partition as I am not storing the offsets any where. def main : Unit = { var sparkStreamingContext = StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION, () => creatingFunc()) ... } def creatingFunc(): Unit = { ... var offsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("sample_sample3_json",0) -> 0) KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder, String](sparkStreamingContext, kafkaParams, offsets, messageHandler) ... } I want to get control over offset management at event level instead of RDD level to make sure that at least once delivery to end system. As per my understanding, every RDD or RDD partition will stored in hdfs as a file If I choose to use HDFS as output. If I use 1sec as batch interval then it will be ended up having huge number of small files in HDFS. Having small files in HDFS will leads to lots of other issues. Is there any way to write multiple RDDs into single file? Don't have muh idea about *coalesce* usage. In the worst case, I can merge all small files in HDFS in regular intervals. Thanks... -- Thanks Raju Bairishetti www.lazada.com