My point is if you keep daily aggregates already computed then you do not
reprocess raw data. But yuh you may decide to recompute last 3 days
everyday.
On 29 May 2015 23:52, "Igor Berman" <igor.ber...@gmail.com> wrote:

> Hi Ayan,
> thanks for the response
> I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only
> core, might be I should?)
> What do you mean by materialized? I can repartitionAndSort by key
> daily-aggregation, however I'm not quite understand how it will help with
> yesterdays block which needs to be loaded from file and it has no
> connection to this repartition of daily block.
>
>
> On 29 May 2015 at 01:51, ayan guha <guha.a...@gmail.com> wrote:
>
>> Which version of spark? In 1.4 window queries will show up for these kind
>> of scenarios.
>>
>> 1 thing I can suggest is keep daily aggregates materialised and partioned
>> by key and sorted by key-day combination using repartitionandsort method.
>> It allows you to use custom partitioner and custom sorter.
>>
>> Best
>> Ayan
>> On 29 May 2015 03:31, "igor.berman" <igor.ber...@gmail.com> wrote:
>>
>>> Hi,
>>> I have a batch daily job that computes daily aggregate of several
>>> counters
>>> represented by some object.
>>> After daily aggregation is done, I want to compute block of 3 days
>>> aggregation(3,7,30 etc)
>>> To do so I need to add new daily aggregation to the current block and
>>> then
>>> subtract from current block the daily aggregation of the last day within
>>> the
>>> current block(sliding window...)
>>> I've implemented it with something like:
>>>
>>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
>>> All rdds are keyed by unique id(long). Each rdd is saved in avro files
>>> after
>>> the job finishes and loaded when job starts(on next day). baseBlockRdd is
>>> much larger than lastDay and newDay rdds(depends on the size of the
>>> block)
>>>
>>> Unfortunately the performance is not satisfactory due to many shuffles(I
>>> have parallelism etc) I was looking for the way to improve performance
>>> somehow, to make sure that one task "joins" same local keys without
>>> reshuffling baseBlockRdd(which is big) each time the job starts(see
>>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
>>> so bottom line - how to join big rdd with smaller rdd without reshuffling
>>> big rdd over and over again?
>>> As soon as I've saved this big rdd and reloaded it from disk I want that
>>> every other rdd will be partitioned and collocated by the same
>>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
>>> small
>>> rdds will be sent over network.
>>>
>>> Another idea I had  - somehow split baseBlock into 2 parts with filter by
>>> keys of small rdds and then join, however I'm not sure it's possible to
>>> implement this filter without join.
>>>
>>> any ideas would be appreciated,
>>> thanks in advance
>>> Igor
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>

Reply via email to