Hi All,

I think I have finally figured out a way do the Paul's idea (See the thread
"Unifying CEP and BAM Languages"
http://mail.wso2.org/mailarchive/architecture/2014-May/016366.html for more
details).


Currently event tables in Siddhi are backed by a disk and can be used to
joins

However, you must use it with a stream and it is triggered from a stream.
For example currently *from WeatherStream insert into ..* is not
defined.  *Idea
is to use that for batch processing!*

Proposal is to extend event tables definition

1. Add an timestamp field  to event tables (Then BAM stream also become a
event table)

2. *Define “from” operation on event table to cover batch processing*. For
example

from EvetTable#window.batch(2h) avg(pressure) as avgPressure will run as
batch job with 2h data using data in the event table. (We will execute this
in Spark using Siddhi engine). If #window.batch(2h) is omitted,
#window.batch(5m) is assumed.

3. You can define an event table on top of DB, disk, Cassandra, hdfs, BAM
stream stored etc ..

Let me take an example

Say you want to calculate avg and stddev of pressure once every 24h as a
batch job, and raise an alarm if current pressure is less than 3 stddev
from the mean calculated in batch process. (this is a extended Lambda
architecture scenario) Then you can do all this with Siddhi, and query will
look like following.

*define eventTable WeatherTable using BAMStream … *

*define eventTable  WeatherStatsTable using BAMStream … *

*//batch job*

*from **WeatherTable**#window.batch(24h) stddev(pressure) , avg (pressure)*

*     insert into **WeatherStatsTable**; *

*//use results from batch job in realtime *

*from WeatherStream as  p join **WeatherStatsTable**#window.last() as s *

*          on pressure < s.mean -2*s.stddev*

*     insert into WeatherAlarmStream; *

First query runs once every 24 hours and calculate mean and stddev and
write to disk. That value is joined against the live stream as they come in
via join.

Few more rules (and there are more details that we need to figure out)

1. You read from event table, then it runs as batch processes

2. If you join with event table, it works as now. However, you can define a
window when joining on top of event tables as well. e.g.
WeatherStats#window.batch(5m) means takes events came in last 5 mins.

3. We need to define how it behaves when timestamp field is not defined.
Best is to only support *joins* and not support *from* in that case.

4. When processing batches, it runs in parallel if partitions are defined.
For example, if you want to calculate mean in map reduce style, it will
look like following.

*define partition StationParition WeatherStream.stationID; *

*from WeatherStream#window.batch(24h) avg(pressure) *

*     insert into WeatherMeanStream using StationParition;  *

If no partitions, it will run sequentially (We can improve on this later).

For execution, we want to init siddhi within Spark, and run thing in
parallel using spark, but actual evaluation will done by Siddhi.

5. Users can define other windows like sliding windows with event tables.
However, Siddhi will read data from disk once every 5 minutes or so. So
results will be the same, however, it might come bit later than with
streams.


Please comment

Thanks

Srinath




-- 
============================
Blog: http://srinathsview.blogspot.com twitter:@srinath_perera
Site: http://people.apache.org/~hemapani/
Photos: http://www.flickr.com/photos/hemapani/
Phone: 0772360902
_______________________________________________
Architecture mailing list
Architecture@wso2.org
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to