Hi Cody ,

Thanks for the clarification. I will try to come up with some workaround.

I have an another doubt. When my job is restarted, and recovers from the
checkpoint it does the re-partitioning step twice for each 15 minute job
until the window of 2 hours is complete. Then the re-partitioning  takes
place only once.

For eg - When the job recovers at 16:15 it does re-partitioning for the
16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other
intermediate stages are computed for 10:00 batch. I am using
reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is
complete i.e at 18:15 repartitioning takes place only once. Seems like the
checkpoint does not have rdd stored for beyond 2 hrs which is my window
duration.  Because of this my job takes more time than usual.

Is there a way or some configuration parameter which would help avoid
repartitioning twice ?

I am attaching the snapshot for the same.

Thanks !!
Kundan

On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Unless you change maxRatePerPartition, a batch is going to contain all of
> the offsets from the last known processed to the highest available.
>
> Offsets are not time-based, and Kafka's time-based api currently has very
> poor granularity (it's based on filesystem timestamp of the log segment).
> There's a kafka improvement proposal to add time-based indexing, but I
> wouldn't expect it soon.
>
> Basically, if you want batches to relate to time even while your spark job
> is down, you need an external process to index Kafka and do some custom
> work to use that index to generate batches.
>
> Or (preferably) embed a time in your message, and do any time-based
> calculations using that time, not time of processing.
>
> On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar <iitr.kun...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using spark streaming check-pointing mechanism and reading the data
>> from kafka. The window duration for my application is 2 hrs with a sliding
>> interval of 15 minutes.
>>
>> So, my batches run at following intervals...
>> 09:45
>> 10:00
>> 10:15
>> 10:30  and so on
>>
>> Suppose, my running batch dies at 09:55 and I restart the application at
>> 12:05, then the flow is something like
>>
>> At 12:05 it would run the 10:00 batch -> would this read the kafka
>> offsets from the time it went down (or 9:45)  to 12:00 ? or  just upto
>> 10:10 ?
>> then next would 10:15 batch - what would be the offsets as input for this
>> batch ? ...so on for all the queued batches
>>
>>
>> Basically, my requirement is such that when the application is restarted
>> at 12:05 then it should read the kafka offsets till 10:00  and then the
>> next queued batch takes offsets from 10:00 to 10:15 and so on until all the
>> queued batches are processed.
>>
>> If this is the way offsets are handled for all the queued batched and I
>> am fine.
>>
>> Or else please provide suggestions on how this can be done.
>>
>>
>>
>> Thanks!!!
>>
>>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to