Actually currently in ESB-analytics also we are using a similar mechanism. In there we keep 5 tables for each time unites(second, minute, hour, day, month) and we have windowOffset in correspondent to it time unite like in minute-60, hour- 3600 etc. We are only storing min, max, sum and count values only. So at a given time if we need the average, we divide the sum with count and get that.
On Fri, Mar 25, 2016 at 8:47 AM, Inosh Goonewardena <in...@wso2.com> wrote: > > > On Thu, Mar 24, 2016 at 9:56 PM, Sachith Withana <sach...@wso2.com> wrote: > >> Hi Inosh, >> >> We wouldn't have to do that IMO. >> >> We can persist the total aggregate value upto currentTimeWindow - >> WindowOffset, along with the previous time window aggregation meta data as >> well ( count, sum in the average aggregation case). >> > > Yep. That will do. > >> >> The previous total wouldn't be calculated again, it's the last two time >> windows ( including the current one) that we need to recalculate and add it >> to the previous total. >> >> It works almost the same way as the current incremental processing table, >> but keeping more meta_data on the aggregation related values. >> >> @Anjana WDYT? >> >> Thanks, >> Sachith >> >> >> On Fri, Mar 25, 2016 at 7:07 AM, Inosh Goonewardena <in...@wso2.com> >> wrote: >> >>> Hi Sachith, >>> >>> >>> *WindowOffset*: >>>> >>>> Events might arrive late that would belong to a previous processed time >>>> window. To account to that, we have added an optional parameter that would >>>> allow to >>>> process immediately previous time windows as well ( acts like a buffer). >>>> ex: If this is set to 1, apart from the to-be-processed data, data >>>> related to the previously processed time window will also be taken for >>>> processing. >>>> >>> >>> >>> This means, if window offset is set, already processed data will be >>> processed again. How does the aggregate functions works in this case? >>> Actually, what is the plan on supporting aggregate functions? Let's take >>> average function as an example. Are we going to persist sum and count >>> values per time windows and re-calculate whole average based on values of >>> all time windows? Is so, I would guess we can update previously processed >>> time windows sum and count values. >>> >>> >>> On Thu, Mar 24, 2016 at 3:50 AM, Sachith Withana <sach...@wso2.com> >>> wrote: >>> >>>> Hi Srinath, >>>> >>>> Please find the comments inline. >>>> >>>> On Thu, Mar 24, 2016 at 11:39 AM, Srinath Perera <srin...@wso2.com> >>>> wrote: >>>> >>>>> Hi Sachith, Anjana, >>>>> >>>>> +1 for the backend model. >>>>> >>>>> Are we handling the case when last run was done, 25 minutes of data is >>>>> processed. Basically, next run has to re-run last hour and update the >>>>> value. >>>>> >>>> >>>> Yes. It will recalculate for that hour and will update the value. >>>> >>>> >>>>> >>>>> When does one hour counting starts? is it from the moment server >>>>> starts? That will be probabilistic when you restart. I think we need to >>>>> either start with know place ( midnight) or let user configure it. >>>>> >>>> >>>> In the first run all the data available are processed. >>>> After that it calculates the floor of last processed events' timestamp >>>> and gets the floor value (timestamp - timestamp%3600), that would be used >>>> as the start of the time windows. >>>> >>>>> >>>>> I am bit concern about the syntax though. This only works for very >>>>> specific type of queries ( that includes aggregate and a group by). What >>>>> happen if user do this with a different query? Can we give clear error >>>>> message? >>>>> >>>> >>>> Currently the error messages are very generic. We will have to work on >>>> it to improve those messages. >>>> >>>> Thanks, >>>> Sachith >>>> >>>> >>>>> >>>>> --Srinath >>>>> >>>>> On Mon, Mar 21, 2016 at 5:15 PM, Sachith Withana <sach...@wso2.com> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> We are adding incremental processing capability to Spark in DAS. >>>>>> As the first stage, we added time slicing to Spark execution. >>>>>> >>>>>> Here's a quick introduction into that. >>>>>> >>>>>> *Execution*: >>>>>> >>>>>> In the first run of the script, it will process all the data in the >>>>>> given table and store the last processed event timestamp. >>>>>> Then from the next run onwards it would start processing starting >>>>>> from that stored timestamp. >>>>>> >>>>>> Until the query that contains the data processing part, completes, >>>>>> last processed event timestamp would not be overridden with the new >>>>>> value. >>>>>> This is to ensure that the data processing for the query wouldn't >>>>>> have to be done again, if the whole query fails. >>>>>> This is ensured by adding a commit query after the main query. >>>>>> Refer to the Syntax section for the example. >>>>>> >>>>>> *Syntax*: >>>>>> >>>>>> In the spark script, incremental processing support has to be >>>>>> specified per table, this would happen in the create temporary table >>>>>> line. >>>>>> >>>>>> ex: CREATE TEMPORARY TABLE T1 USING CarbonAnalytics options >>>>>> (tableName "test", >>>>>> *incrementalProcessing "T1,3600");* >>>>>> >>>>>> INSERT INTO T2 SELECT username, age GROUP BY age FROM T1; >>>>>> >>>>>> INC_TABLE_COMMIT T1; >>>>>> >>>>>> The last line is where it ensures the processing took place >>>>>> successfully and replaces the lastProcessed timestamp with the new one. >>>>>> >>>>>> *TimeWindow*: >>>>>> >>>>>> To do the incremental processing, the user has to provide the time >>>>>> window per which the data would be processed. >>>>>> In the above example. the data would be summarized in *1 hour *time >>>>>> windows. >>>>>> >>>>>> *WindowOffset*: >>>>>> >>>>>> Events might arrive late that would belong to a previous processed >>>>>> time window. To account to that, we have added an optional parameter that >>>>>> would allow to >>>>>> process immediately previous time windows as well ( acts like a >>>>>> buffer). >>>>>> ex: If this is set to 1, apart from the to-be-processed data, data >>>>>> related to the previously processed time window will also be taken for >>>>>> processing. >>>>>> >>>>>> >>>>>> *Limitations*: >>>>>> >>>>>> Currently, multiple time windows cannot be specified per temporary >>>>>> table in the same script. >>>>>> It would have to be done using different temporary tables. >>>>>> >>>>>> >>>>>> >>>>>> *Future Improvements:* >>>>>> - Add aggregation function support for incremental processing >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Sachith >>>>>> -- >>>>>> Sachith Withana >>>>>> Software Engineer; WSO2 Inc.; http://wso2.com >>>>>> E-mail: sachith AT wso2.com >>>>>> M: +94715518127 >>>>>> Linked-In: <http://goog_416592669> >>>>>> https://lk.linkedin.com/in/sachithwithana >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> ============================ >>>>> Srinath Perera, Ph.D. >>>>> http://people.apache.org/~hemapani/ >>>>> http://srinathsview.blogspot.com/ >>>>> >>>> >>>> >>>> >>>> -- >>>> Sachith Withana >>>> Software Engineer; WSO2 Inc.; http://wso2.com >>>> E-mail: sachith AT wso2.com >>>> M: +94715518127 >>>> Linked-In: <http://goog_416592669> >>>> https://lk.linkedin.com/in/sachithwithana >>>> >>>> _______________________________________________ >>>> Architecture mailing list >>>> Architecture@wso2.org >>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>>> >>>> >>> >>> >>> -- >>> Thanks & Regards, >>> >>> Inosh Goonewardena >>> Associate Technical Lead- WSO2 Inc. >>> Mobile: +94779966317 >>> >> >> >> >> -- >> Sachith Withana >> Software Engineer; WSO2 Inc.; http://wso2.com >> E-mail: sachith AT wso2.com >> M: +94715518127 >> Linked-In: <http://goog_416592669> >> https://lk.linkedin.com/in/sachithwithana >> > > > > -- > Thanks & Regards, > > Inosh Goonewardena > Associate Technical Lead- WSO2 Inc. > Mobile: +94779966317 > > _______________________________________________ > Architecture mailing list > Architecture@wso2.org > https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture > > -- W.G. Gihan Anuruddha Senior Software Engineer | WSO2, Inc. M: +94772272595
_______________________________________________ Architecture mailing list Architecture@wso2.org https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture