My point is if you keep daily aggregates already computed then you do not reprocess raw data. But yuh you may decide to recompute last 3 days everyday. On 29 May 2015 23:52, "Igor Berman" <igor.ber...@gmail.com> wrote:
> Hi Ayan, > thanks for the response > I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only > core, might be I should?) > What do you mean by materialized? I can repartitionAndSort by key > daily-aggregation, however I'm not quite understand how it will help with > yesterdays block which needs to be loaded from file and it has no > connection to this repartition of daily block. > > > On 29 May 2015 at 01:51, ayan guha <guha.a...@gmail.com> wrote: > >> Which version of spark? In 1.4 window queries will show up for these kind >> of scenarios. >> >> 1 thing I can suggest is keep daily aggregates materialised and partioned >> by key and sorted by key-day combination using repartitionandsort method. >> It allows you to use custom partitioner and custom sorter. >> >> Best >> Ayan >> On 29 May 2015 03:31, "igor.berman" <igor.ber...@gmail.com> wrote: >> >>> Hi, >>> I have a batch daily job that computes daily aggregate of several >>> counters >>> represented by some object. >>> After daily aggregation is done, I want to compute block of 3 days >>> aggregation(3,7,30 etc) >>> To do so I need to add new daily aggregation to the current block and >>> then >>> subtract from current block the daily aggregation of the last day within >>> the >>> current block(sliding window...) >>> I've implemented it with something like: >>> >>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) >>> All rdds are keyed by unique id(long). Each rdd is saved in avro files >>> after >>> the job finishes and loaded when job starts(on next day). baseBlockRdd is >>> much larger than lastDay and newDay rdds(depends on the size of the >>> block) >>> >>> Unfortunately the performance is not satisfactory due to many shuffles(I >>> have parallelism etc) I was looking for the way to improve performance >>> somehow, to make sure that one task "joins" same local keys without >>> reshuffling baseBlockRdd(which is big) each time the job starts(see >>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) >>> so bottom line - how to join big rdd with smaller rdd without reshuffling >>> big rdd over and over again? >>> As soon as I've saved this big rdd and reloaded it from disk I want that >>> every other rdd will be partitioned and collocated by the same >>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only >>> small >>> rdds will be sent over network. >>> >>> Another idea I had - somehow split baseBlock into 2 parts with filter by >>> keys of small rdds and then join, however I'm not sure it's possible to >>> implement this filter without join. >>> >>> any ideas would be appreciated, >>> thanks in advance >>> Igor >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >