Hi Suho,

Basically proposal is to use @parallel=distributed{ queries} to group
queries into distributed groups and then use storm to distribute the groups
across different machines. I think that should work.

--Srinath


On Mon, Dec 9, 2013 at 11:41 AM, Sriskandarajah Suhothayan <s...@wso2.com>wrote:

> I'm working on the Siddhi syntax for the distributed processing
>
> We can use the bellow execution plan for distributed processing.
>
> <?xml version="1.0" encoding="UTF-8"?>
> <executionPlan name="ATMStatsExecutionPlan" statistics="disable"
>                trace="disable" xmlns="
> http://wso2.org/carbon/eventprocessor";>
>     <description>This execution plan is used to identify the possible
> fraud transaction</description>
>     <processMode>local|active-passive|distributed</processMode>
>     <importedStreams>
>         <stream as="atmStatsStream" name="atmStatsStream" version="1.0.0"/>
>     </importedStreams>
>     <queryExpressions>
>         <![CDATA[
>
>             @parallel="full"
>             from atmRowStream[cardType=="Credit"]
>             inset into atmStatsStream
>
>             @parallel="partition"
>             {
>             partition by bankPartition atmStatsStream.cardProvider
>
>             from every a1 = atmStatsStream[amountWithdrawed < 100]
>             -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo
> == b1.cardNo]
>             within 1 day
>             select a1.cardNo as cardNo, a1.cardHolderName as
> cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as
> location, b1.cardHolderMobile as cardHolderMobile
>             insert into possibleFraudStream
>             partition by bankPartition
>
>             from every a1 = atmStatsStream[amountWithdrawed < 100]
>                 -> b1 = atmStatsStream[amountWithdrawed > 10000 and
> a1.cardNo == b1.cardNo]
>             within 1 day
>             select a1.cardNo as cardNo, a1.cardHolderName as
> cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as
> location, b1.cardHolderMobile as cardHolderMobile
>             insert into possibleFraudStream
>             partition by bankPartition
>             }
>
>         ]]>
>         </queryExpressions>
>     <exportedStreams>
>         <stream name="possibleFraudStream" valueOf="possibleFraudStream"
> version="1.0.0"/>
>     </exportedStreams>
> </executionPlan>
>
> Here we'll have three modes of execution
> 1. local
> 2. active-passive
> 3. distributed
>
> *Local mode*
> This is the one we have now.
>
> * Active-passive*
> Here there will be 2 nodes, one active and the other passive. There will
> be a handshake protocol between Active and passive, this will be used for
> state replication and syncing when a node goes down and joins back.
>
> *Distributed*
> Here we use Annotations (These are ignored on the other modes), the
> "parallel" annotation denotes the parallelism level and it can be "full"
> for fully distributed, "petition" for distribute according to the
> partition, or "single" for no distribution.
> In the "petition" case all the queries need to be partitioned by the same
> partition. We can also use curly braces {} to denote grouping
> of parallelism whereby forcing all the queries to fall on the same Siddhi
> instance.
> We can combine storms reliable messaging and snapshot persistence to
> achieve reliable messaging but this still needs more investigation.
>
> Currently we'll mainly focus on the Active-passive case as it will
> provided reliable and fault tolerant message processing easily and at the
> same time we'll also work on the storm integration for the distributed case.
>
> Thoughts?
>
> Suho
>
>
>
>
>
> On Wed, Nov 27, 2013 at 11:07 AM, Sanjiva Weerawarana <sanj...@wso2.com>wrote:
>
>> +1 .. excellent job getting this off the ground! I'd love to see the
>> numbers in a real distributed set up :).
>>
>>
>> On Wed, Nov 27, 2013 at 1:47 PM, Srinath Perera <srin...@wso2.com> wrote:
>>
>>> Hi All,
>>>
>>> I have written a Siddhi bolt that you can use to run Siddhi using Storm
>>> in a distributed setup.
>>>
>>> You can create a SiddhiBolt(s) given any Siddhi query like following.
>>>
>>> SiddhiBolt siddhiBolt = new SiddhiBolt(
>>>         new String[]{ "define stream PlayStream1 ( sid string, ts long,
>>> x double, y double, z double, a double, v double);"},
>>>         new String[]{ "from PlayStream1#window.timeBatch(1sec) select
>>> sid, avg(v) as avgV insert into AvgRunPlay;" },
>>>         new String[]{"AvgRunPlay"});
>>>
>>> Then those bolts can be used within Storm topology like any other bolt.
>>> However, the name of components and streams used in CEP queries should
>>> match.
>>>
>>> TopologyBuilder builder = new TopologyBuilder();
>>> builder.setSpout("PlayStream1", new FootballDataSpout(), 1);
>>> builder.setBolt("AvgRunPlay", siddhiBolt1,
>>> 1).shuffleGrouping("PlayStream1");
>>>
>>> builder.setBolt("FastRunPlay", siddhiBolt2,1).shuffleGrouping("AvgRunPlay");
>>> builder.setBolt("LeafEacho", new EchoBolt(),
>>> 1).shuffleGrouping("FastRunPlay");
>>>
>>> I have done a quick performance test and got about 140K TPS in local
>>> cluster. We need to test using distributed setup. Lasantha will integrate
>>> this with CEP code base.
>>>
>>> Some potential TODO are
>>> 1) Write two new bolts for Siddhi that support reliable processing and
>>> transaction processing using Storm constructs. (for cases where we need
>>> high reliability while processing)
>>> 2) Integrate this with out data agent so we can send events into  Storm
>>> setup as well.
>>> 3) Extend the Siddhi language to support distributed processing, so
>>> above topology can be written is Siddhi language itself.
>>>
>>> If performance confirmed to be in the same range, given stability of
>>> Storm, I think we can go with Storm for planned Siddhi distributed
>>> processing.
>>>
>>> Thanks
>>> Srinath
>>>
>>> Code for the bolt can be found in
>>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration/src/org/wso2/siddhi/storm/SiddhiBolt.java
>>> .
>>>
>>> Code can be found from
>>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration
>>>
>>> --
>>> ============================
>>> Srinath Perera, Ph.D.
>>>   Director, Research, WSO2 Inc.
>>>   Visiting Faculty, University of Moratuwa
>>>   Member, Apache Software Foundation
>>>   Research Scientist, Lanka Software Foundation
>>>   Blog: http://srinathsview.blogspot.com/
>>>   Photos: http://www.flickr.com/photos/hemapani/
>>>    Phone: 0772360902
>>>
>>> _______________________________________________
>>> Architecture mailing list
>>> Architecture@wso2.org
>>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>>
>>>
>>
>>
>> --
>> Sanjiva Weerawarana, Ph.D.
>> Founder, Chairman & CEO; WSO2, Inc.;  http://wso2.com/
>> email: sanj...@wso2.com; office: +1 650 745 4499 x5700; cell: +94 77 787
>> 6880 | +1 650 265 8311
>> blog: http://sanjiva.weerawarana.org/
>> Lean . Enterprise . Middleware
>>
>> _______________________________________________
>> Architecture mailing list
>> Architecture@wso2.org
>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture
>>
>>
>
>
> --
>
> *S. Suhothayan*
> Associate Technical Lead,
>  *WSO2 Inc. *http://wso2.com
> * <http://wso2.com/>*
> lean . enterprise . middleware
>
>
> *cell: (+94) 779 756 757 <%28%2B94%29%20779%20756%20757> | blog:
> http://suhothayan.blogspot.com/ <http://suhothayan.blogspot.com/> twitter:
> http://twitter.com/suhothayan <http://twitter.com/suhothayan> | linked-in:
> http://lk.linkedin.com/in/suhothayan <http://lk.linkedin.com/in/suhothayan>*
>
>


-- 
============================
Srinath Perera, Ph.D.
   http://people.apache.org/~hemapani/
   http://srinathsview.blogspot.com/
_______________________________________________
Architecture mailing list
Architecture@wso2.org
https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture

Reply via email to