Hi Raju,
Could you please explain your expected behavior with the DStream. The
DStream will have event only from the 'fromOffsets' that you provided in
the createDirectStream (which I think is the expected behavior).

For the smaller files, you will have to deal with smaller files if you
intend to write it immediately. Alternately what we do sometimes is-

1.  Maintain couple of iterations for some 30-40 seconds in application
until we have substantial data and then we write them to disk.
2. Push smaller data back to kafka, and a different job handles the save to
disk.

On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti <r...@apache.org> wrote:

> Thanks for quick reply.
> I am creating Kafka Dstream by passing offsets map. I have pasted code
> snippet in my earlier mail. Let me know am I missing something.
>
> I want to use spark checkpoint for hand ng only driver/executor failures.
> On Jan 22, 2016 10:08 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>
>> Offsets are stored in the checkpoint.  If you want to manage offsets
>> yourself, don't restart from the checkpoint, specify the starting offsets
>> when you create the stream.
>>
>> Have you read / watched the materials linked from
>>
>> https://github.com/koeninger/kafka-exactly-once
>>
>> Regarding the small files problem, either don't use HDFS, or use
>> something like filecrush for merging.
>>
>> On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti <r...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>>
>>>    I am very new to spark & spark-streaming. I am planning to use spark
>>> streaming for real time processing.
>>>
>>>    I have created a streaming context and checkpointing to hdfs
>>> directory for recovery purposes in case of executor failures & driver
>>> failures.
>>>
>>> I am creating Dstream with offset map for getting the data from kafka. I
>>> am simply ignoring the offsets to understand the behavior. Whenver I
>>> restart application driver restored from checkpoint as expected but Dstream
>>> is not getting started from the initial offsets. Dstream was created with
>>> the last consumed offsets instead of startign from 0 offsets for each topic
>>> partition as I am not storing the offsets any where.
>>>
>>> def main : Unit = {
>>>
>>>     var sparkStreamingContext = 
>>> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>>>   () => creatingFunc())
>>>
>>>     ...
>>>
>>>
>>> }
>>>
>>> def creatingFunc(): Unit = {
>>>
>>>     ...
>>>
>>>     var offsets:Map[TopicAndPartition, Long] = 
>>> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>>>
>>>         KafkaUtils.createDirectStream[String,String, StringDecoder, 
>>> StringDecoder,
>>> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>>>
>>> ...
>>> }
>>>
>>> I want to get control over offset management at event level instead of
>>> RDD level to make sure that at least once delivery to end system.
>>>
>>> As per my understanding, every RDD or RDD partition will stored in hdfs
>>> as a file If I choose to use HDFS as output. If I use 1sec as batch
>>> interval then it will be ended up having huge number of small files in
>>> HDFS. Having small files in HDFS will leads to lots of other issues.
>>> Is there any way to write multiple RDDs into single file? Don't have muh
>>> idea about *coalesce* usage. In the worst case, I can merge all small files
>>> in HDFS in regular intervals.
>>>
>>> Thanks...
>>>
>>> ------
>>> Thanks
>>> Raju Bairishetti
>>> www.lazada.com
>>>
>>>
>>>
>>>
>>

Reply via email to