Hi Cody , Thanks for the clarification. I will try to come up with some workaround.
I have an another doubt. When my job is restarted, and recovers from the checkpoint it does the re-partitioning step twice for each 15 minute job until the window of 2 hours is complete. Then the re-partitioning takes place only once. For eg - When the job recovers at 16:15 it does re-partitioning for the 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other intermediate stages are computed for 10:00 batch. I am using reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is complete i.e at 18:15 repartitioning takes place only once. Seems like the checkpoint does not have rdd stored for beyond 2 hrs which is my window duration. Because of this my job takes more time than usual. Is there a way or some configuration parameter which would help avoid repartitioning twice ? I am attaching the snapshot for the same. Thanks !! Kundan On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger <c...@koeninger.org> wrote: > Unless you change maxRatePerPartition, a batch is going to contain all of > the offsets from the last known processed to the highest available. > > Offsets are not time-based, and Kafka's time-based api currently has very > poor granularity (it's based on filesystem timestamp of the log segment). > There's a kafka improvement proposal to add time-based indexing, but I > wouldn't expect it soon. > > Basically, if you want batches to relate to time even while your spark job > is down, you need an external process to index Kafka and do some custom > work to use that index to generate batches. > > Or (preferably) embed a time in your message, and do any time-based > calculations using that time, not time of processing. > > On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar <iitr.kun...@gmail.com> > wrote: > >> Hi, >> >> I am using spark streaming check-pointing mechanism and reading the data >> from kafka. The window duration for my application is 2 hrs with a sliding >> interval of 15 minutes. >> >> So, my batches run at following intervals... >> 09:45 >> 10:00 >> 10:15 >> 10:30 and so on >> >> Suppose, my running batch dies at 09:55 and I restart the application at >> 12:05, then the flow is something like >> >> At 12:05 it would run the 10:00 batch -> would this read the kafka >> offsets from the time it went down (or 9:45) to 12:00 ? or just upto >> 10:10 ? >> then next would 10:15 batch - what would be the offsets as input for this >> batch ? ...so on for all the queued batches >> >> >> Basically, my requirement is such that when the application is restarted >> at 12:05 then it should read the kafka offsets till 10:00 and then the >> next queued batch takes offsets from 10:00 to 10:15 and so on until all the >> queued batches are processed. >> >> If this is the way offsets are handled for all the queued batched and I >> am fine. >> >> Or else please provide suggestions on how this can be done. >> >> >> >> Thanks!!! >> >> >
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org