Thanks for quick reply.
I am creating Kafka Dstream by passing offsets map. I have pasted code
snippet in my earlier mail. Let me know am I missing something.

I want to use spark checkpoint for hand ng only driver/executor failures.
On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote:

> Offsets are stored in the checkpoint.  If you want to manage offsets
> yourself, don't restart from the checkpoint, specify the starting offsets
> when you create the stream.
>
> Have you read / watched the materials linked from
>
> https://github.com/koeninger/kafka-exactly-once
>
> Regarding the small files problem, either don't use HDFS, or use something
> like filecrush for merging.
>
> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org> wrote:
>
>> Hi,
>>
>>
>>    I am very new to spark & spark-streaming. I am planning to use spark
>> streaming for real time processing.
>>
>>    I have created a streaming context and checkpointing to hdfs directory
>> for recovery purposes in case of executor failures & driver failures.
>>
>> I am creating Dstream with offset map for getting the data from kafka. I
>> am simply ignoring the offsets to understand the behavior. Whenver I
>> restart application driver restored from checkpoint as expected but Dstream
>> is not getting started from the initial offsets. Dstream was created with
>> the last consumed offsets instead of startign from 0 offsets for each topic
>> partition as I am not storing the offsets any where.
>>
>> def main : Unit = {
>>
>>     var sparkStreamingContext = 
>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>   () => creatingFunc())
>>
>>     ...
>>
>>
>> }
>>
>> def creatingFunc(): Unit = {
>>
>>     ...
>>
>>     var offsets:Map[TopicAndPartition, Long] = 
>> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>
>>         KafkaUtils.createDirectStream[String,String, StringDecoder, 
>> StringDecoder,
>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>
>> ...
>> }
>>
>> I want to get control over offset management at event level instead of
>> RDD level to make sure that at least once delivery to end system.
>>
>> As per my understanding, every RDD or RDD partition will stored in hdfs
>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>> interval then it will be ended up having huge number of small files in
>> HDFS. Having small files in HDFS will leads to lots of other issues.
>> Is there any way to write multiple RDDs into single file? Don't have muh
>> idea about *coalesce* usage. In the worst case, I can merge all small files
>> in HDFS in regular intervals.
>>
>> Thanks...
>>
>> ------
>> Thanks
>> Raju Bairishetti
>> www.lazada.com
>>
>>
>>
>>
>

Reply via email to