Hi Suho, Basically proposal is to use @parallel=distributed{ queries} to group queries into distributed groups and then use storm to distribute the groups across different machines. I think that should work.
--Srinath On Mon, Dec 9, 2013 at 11:41 AM, Sriskandarajah Suhothayan <s...@wso2.com>wrote: > I'm working on the Siddhi syntax for the distributed processing > > We can use the bellow execution plan for distributed processing. > > <?xml version="1.0" encoding="UTF-8"?> > <executionPlan name="ATMStatsExecutionPlan" statistics="disable" > trace="disable" xmlns=" > http://wso2.org/carbon/eventprocessor"> > <description>This execution plan is used to identify the possible > fraud transaction</description> > <processMode>local|active-passive|distributed</processMode> > <importedStreams> > <stream as="atmStatsStream" name="atmStatsStream" version="1.0.0"/> > </importedStreams> > <queryExpressions> > <![CDATA[ > > @parallel="full" > from atmRowStream[cardType=="Credit"] > inset into atmStatsStream > > @parallel="partition" > { > partition by bankPartition atmStatsStream.cardProvider > > from every a1 = atmStatsStream[amountWithdrawed < 100] > -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo > == b1.cardNo] > within 1 day > select a1.cardNo as cardNo, a1.cardHolderName as > cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as > location, b1.cardHolderMobile as cardHolderMobile > insert into possibleFraudStream > partition by bankPartition > > from every a1 = atmStatsStream[amountWithdrawed < 100] > -> b1 = atmStatsStream[amountWithdrawed > 10000 and > a1.cardNo == b1.cardNo] > within 1 day > select a1.cardNo as cardNo, a1.cardHolderName as > cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as > location, b1.cardHolderMobile as cardHolderMobile > insert into possibleFraudStream > partition by bankPartition > } > > ]]> > </queryExpressions> > <exportedStreams> > <stream name="possibleFraudStream" valueOf="possibleFraudStream" > version="1.0.0"/> > </exportedStreams> > </executionPlan> > > Here we'll have three modes of execution > 1. local > 2. active-passive > 3. distributed > > *Local mode* > This is the one we have now. > > * Active-passive* > Here there will be 2 nodes, one active and the other passive. There will > be a handshake protocol between Active and passive, this will be used for > state replication and syncing when a node goes down and joins back. > > *Distributed* > Here we use Annotations (These are ignored on the other modes), the > "parallel" annotation denotes the parallelism level and it can be "full" > for fully distributed, "petition" for distribute according to the > partition, or "single" for no distribution. > In the "petition" case all the queries need to be partitioned by the same > partition. We can also use curly braces {} to denote grouping > of parallelism whereby forcing all the queries to fall on the same Siddhi > instance. > We can combine storms reliable messaging and snapshot persistence to > achieve reliable messaging but this still needs more investigation. > > Currently we'll mainly focus on the Active-passive case as it will > provided reliable and fault tolerant message processing easily and at the > same time we'll also work on the storm integration for the distributed case. > > Thoughts? > > Suho > > > > > > On Wed, Nov 27, 2013 at 11:07 AM, Sanjiva Weerawarana <sanj...@wso2.com>wrote: > >> +1 .. excellent job getting this off the ground! I'd love to see the >> numbers in a real distributed set up :). >> >> >> On Wed, Nov 27, 2013 at 1:47 PM, Srinath Perera <srin...@wso2.com> wrote: >> >>> Hi All, >>> >>> I have written a Siddhi bolt that you can use to run Siddhi using Storm >>> in a distributed setup. >>> >>> You can create a SiddhiBolt(s) given any Siddhi query like following. >>> >>> SiddhiBolt siddhiBolt = new SiddhiBolt( >>> new String[]{ "define stream PlayStream1 ( sid string, ts long, >>> x double, y double, z double, a double, v double);"}, >>> new String[]{ "from PlayStream1#window.timeBatch(1sec) select >>> sid, avg(v) as avgV insert into AvgRunPlay;" }, >>> new String[]{"AvgRunPlay"}); >>> >>> Then those bolts can be used within Storm topology like any other bolt. >>> However, the name of components and streams used in CEP queries should >>> match. >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> builder.setSpout("PlayStream1", new FootballDataSpout(), 1); >>> builder.setBolt("AvgRunPlay", siddhiBolt1, >>> 1).shuffleGrouping("PlayStream1"); >>> >>> builder.setBolt("FastRunPlay", siddhiBolt2,1).shuffleGrouping("AvgRunPlay"); >>> builder.setBolt("LeafEacho", new EchoBolt(), >>> 1).shuffleGrouping("FastRunPlay"); >>> >>> I have done a quick performance test and got about 140K TPS in local >>> cluster. We need to test using distributed setup. Lasantha will integrate >>> this with CEP code base. >>> >>> Some potential TODO are >>> 1) Write two new bolts for Siddhi that support reliable processing and >>> transaction processing using Storm constructs. (for cases where we need >>> high reliability while processing) >>> 2) Integrate this with out data agent so we can send events into Storm >>> setup as well. >>> 3) Extend the Siddhi language to support distributed processing, so >>> above topology can be written is Siddhi language itself. >>> >>> If performance confirmed to be in the same range, given stability of >>> Storm, I think we can go with Storm for planned Siddhi distributed >>> processing. >>> >>> Thanks >>> Srinath >>> >>> Code for the bolt can be found in >>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration/src/org/wso2/siddhi/storm/SiddhiBolt.java >>> . >>> >>> Code can be found from >>> https://svn.wso2.org/repos/wso2/people/srinath/projects/siddhiStormIntegration >>> >>> -- >>> ============================ >>> Srinath Perera, Ph.D. >>> Director, Research, WSO2 Inc. >>> Visiting Faculty, University of Moratuwa >>> Member, Apache Software Foundation >>> Research Scientist, Lanka Software Foundation >>> Blog: http://srinathsview.blogspot.com/ >>> Photos: http://www.flickr.com/photos/hemapani/ >>> Phone: 0772360902 >>> >>> _______________________________________________ >>> Architecture mailing list >>> Architecture@wso2.org >>> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >>> >>> >> >> >> -- >> Sanjiva Weerawarana, Ph.D. >> Founder, Chairman & CEO; WSO2, Inc.; http://wso2.com/ >> email: sanj...@wso2.com; office: +1 650 745 4499 x5700; cell: +94 77 787 >> 6880 | +1 650 265 8311 >> blog: http://sanjiva.weerawarana.org/ >> Lean . Enterprise . Middleware >> >> _______________________________________________ >> Architecture mailing list >> Architecture@wso2.org >> https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture >> >> > > > -- > > *S. Suhothayan* > Associate Technical Lead, > *WSO2 Inc. *http://wso2.com > * <http://wso2.com/>* > lean . enterprise . middleware > > > *cell: (+94) 779 756 757 <%28%2B94%29%20779%20756%20757> | blog: > http://suhothayan.blogspot.com/ <http://suhothayan.blogspot.com/> twitter: > http://twitter.com/suhothayan <http://twitter.com/suhothayan> | linked-in: > http://lk.linkedin.com/in/suhothayan <http://lk.linkedin.com/in/suhothayan>* > > -- ============================ Srinath Perera, Ph.D. http://people.apache.org/~hemapani/ http://srinathsview.blogspot.com/
_______________________________________________ Architecture mailing list Architecture@wso2.org https://mail.wso2.org/cgi-bin/mailman/listinfo/architecture