Hi Sinthuja,

Please find the comments inline.


On Wed, Mar 23, 2016 at 6:50 PM, Sinthuja Ragendran <sinth...@wso2.com>
wrote:

> Hi Sachith,
>
> On Mon, Mar 21, 2016 at 5:15 PM, Sachith Withana <sach...@wso2.com> wrote:
>
>> Hi all,
>>
>> We are adding incremental processing capability to Spark in DAS.
>> As the first stage, we added time slicing to Spark execution.
>>
>> Here's a quick introduction into that.
>>
>> *Execution*:
>>
>> In the first run of the script, it will process all the data in the given
>> table and store the last processed event timestamp.
>> Then from the next run onwards it would start processing starting from
>> that stored timestamp.
>>
>> Until the query that contains the data processing part, completes, last
>> processed event timestamp would not be overridden with the new value.
>> This is to ensure that the data processing for the query wouldn't have to
>> be done again, if the whole query fails.
>> This is ensured by adding a commit query after the main query.
>> Refer to the Syntax section for the example.
>>
>> *Syntax*:
>>
>> In the spark script, incremental processing support has to be specified
>> per table, this would happen in the create temporary table line.
>>
>> ex: CREATE TEMPORARY TABLE T1 USING CarbonAnalytics options (tableName
>> "test",
>> *incrementalProcessing "T1,3600");*
>>
>
> Why do we need T1 with  incrementalProcessing "T1,3600" ? Isn't having
> only the time duration 3600 is enough given that this is coming under T1
> carbon analytics properties.
>

This is done so that we can pass an ID to the spark RDD where the
incremental processing stuff would happen. We don't have access to the
temporary table name in that context. ( RDD is like an Data frame
abstraction of a table in Spark).


>
> I believe with this, the user needs to create another temporary table if
> the user wants to write incremental query for the same physical table. For
> example, for the below table T1, if I want to do some different analytics
> and insert the results back to table T3, then I have to create another
> temporary table for test.
>

Yes. That's a limitation in the current version. It will fixed in the next
iteration ( as mentioned in the limitations section ).

>
> Is the temporary tables are defined globally? If I had a different script
> and can I define the same temporary table name and pass some different
> incremental processing configuration? IMHO, it should be local to the
> specific script, not global as the users may not have track of all the
> temporary table names they define in other scripts.
>

I think the temporary tables are defined globally. Niranda should be able
to shed some light in to the matter.

>
>> INSERT INTO T2 SELECT username, age GROUP BY age FROM T1;
>>
>> INC_TABLE_COMMIT T1;
>>
>
> Do we need to have the another explicit query to submit the timestamp? It
> may induce to have user errors if he/she forgets to include it. Can't we
> commit the changes after the successful execution of the given actual
> insert query?
>
>
The problem is, to make sure that insertion happened successfully, we'll
have to intercept those details from inside spark itself. I do agree that
it is a hassle to include another step. But right now, for the first
iteration, that's how we can pass the success message to our RDD
implementation without intercepting it from inside spark.


>
>> The last line is where it ensures the processing took place successfully
>> and replaces the lastProcessed timestamp with the new one.
>>
>> *TimeWindow*:
>>
>> To do the incremental processing, the user has to provide the time window
>> per which the data would be processed.
>> In the above example. the data would be summarized in *1 hour *time
>> windows.
>>
>
> Ie, it will process the data from current timestamp-3600  to current time
> stamp?
>

Not exactly. It will get the time window the last processed event belongs
to ( the floor of the timestamp), then optionally you have the ability to
specify whether more time windows from the past should be reprocessed as
well.
Then it will process the data from that calculated last timestamp onwards.


> For example, lets say I have configured the incremental processing, and 1
> and 2nd run was successful, and my analyzer node is down for 3 hours and my
> receiver is collecting the events still. So when I bring back my analyzer
> node, will the next iteration considers the full data from where it has
> processed last or it will consider only last 1 hour data?
>

It will consider the full data set from the last processed time window
onwards ( similar to what I've described in the line above).

Thanks,
Sachith

>
> Thanks,
> Sinthuja.
>
>>
>> *WindowOffset*:
>>
>> Events might arrive late that would belong to a previous processed time
>> window. To account to that, we have added an optional parameter that would
>> allow to
>> process immediately previous time windows as well ( acts like a buffer).
>> ex: If this is set to 1, apart from the to-be-processed data, data
>> related to the previously processed time window will also be taken for
>> processing.
>>
>
>> *Limitations*:
>>
>> Currently, multiple time windows cannot be specified per temporary table
>> in the same script.
>> It would have to be done using different temporary tables.
>>
>>
>>
>> *Future Improvements:*
>> - Add aggregation function support for incremental processing
>>
>>
>> Thanks,
>> Sachith
>> --
>> Sachith Withana
>> Software Engineer; WSO2 Inc.; http://wso2.com
>> E-mail: sachith AT wso2.com
>> M: +94715518127
>> Linked-In: <http://goog_416592669>
>> https://lk.linkedin.com/in/sachithwithana
>>
>> _______________________________________________
>> Architecture mailing list
>> Architecture@wso2.org
>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>
>>
>
>
> --
> *Sinthuja Rajendran*
> Associate Technical Lead
> WSO2, Inc.:http://wso2.com
>
> Blog: http://sinthu-rajan.blogspot.com/
> Mobile: +94774273955
>
>
>
> _______________________________________________
> Architecture mailing list
> Architecture@wso2.org
> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>
>


-- 
Sachith Withana
Software Engineer; WSO2 Inc.; http://wso2.com
E-mail: sachith AT wso2.com
M: +94715518127
Linked-In: <http://goog_416592669>https://lk.linkedin.com/in/sachithwithana
_______________________________________________
Architecture mailing list
Architecture@wso2.org
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to