GitHub user haoch opened a pull request:
https://github.com/apache/flink/pull/2486
FLINK-4520 Integrate Siddhi as a light-weight Streaming CEP Library
Thanks for contributing to Apache Flink. Before you open your pull request,
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your
pull request. For more information and/or questions please refer to the [How To
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful
description of your changes.
- [x] General
- The pull request references the related JIRA issue ("[FLINK-XXX] Jira
title text")
- The pull request addresses only one issue
- Each commit in the PR has a meaningful commit message (including the
JIRA id)
- [x] Documentation
- Documentation has been added for new functionality
- Old documentation affected by the pull request has been updated
- JavaDoc for public methods has been added
- [x] Tests & Build
- Functionality added by the pull request is covered by tests
- `mvn clean verify` has been executed successfully locally or a Travis
build has passed
# Abstraction
Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event
Processing Engine (CEP) released as a Java Library under `Apache Software
License v2.0`. Siddhi CEP processes events which are generated by various event
sources, analyses them and notifies appropriate complex events according to the
user specified queries.
__It would be very helpful for flink users (especially streaming
application developer) to provide a library to run Siddhi CEP query directly in
Flink streaming application.__
# Features
* Integrate Siddhi CEP as an stream operator (i.e.
`TupleStreamSiddhiOperator`), supporting rich CEP features like
* Filter
* Join
* Aggregation
* Group by
* Having
* Window
* Conditions and Expressions
* Pattern processing
* Sequence processing
* Event Tables
...
* Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See
`SiddhiCEP` and `SiddhiStream`)
* Register Flink DataStream associating native type information with
Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
* Connect with single or multiple Flink DataStreams with Siddhi CEP
Execution Plan
* Return output stream as DataStream with type intelligently inferred
from Siddhi Stream Schema
* Integrate siddhi runtime state management with Flink state (See
`AbstractSiddhiOperator`)
* Support siddhi plugin management to extend CEP functions. (See
`SiddhiCEP#registerExtension`)
# Test Cases
* [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase
`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java)
# Example
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
cep.registerStream("inputStream1", input1, "id", "name",
"price","timestamp");
cep.registerStream("inputStream2", input2, "id", "name",
"price","timestamp");
DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
.from("inputStream1").union("inputStream2")
.sql(
"from every s1 = inputStream1[id == 2] "
+ " -> s2 = inputStream2[id == 3] "
+ "select s1.id as id_1, s1.name as name_1, s2.id as
id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price"
+ "insert into outputStream"
)
.returns("outputStream");
env.execute();
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/haoch/flink FLINK-4520
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2486.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2486
----
commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0
Author: Hao Chen <[email protected]>
Date: 2016-08-29T12:34:42Z
Implement initial version of flink-siddhi stream operator
commit d8b131d9204e9e359afae80087afe2aecab27eaf
Author: Chen, Hao <[email protected]>
Date: 2016-08-30T14:33:59Z
Implement SiddhiStream API and DSL
commit 56c150fadc54d06c186223e43a6fd9ac74eee837
Author: Chen, Hao <[email protected]>
Date: 2016-08-30T18:31:18Z
Reformat code following checkstyle
commit 138b0833715a7d110ddb4562e83fb6f0d5338fea
Author: Chen, Hao <[email protected]>
Date: 2016-08-31T17:41:52Z
Add SiddhiTypeUtils and support stream as typed tuple
commit 2484c998bd4def5330012bd6797fb807939752b7
Author: Chen, Hao <[email protected]>
Date: 2016-09-07T17:10:03Z
Support siddhi stream union
commit 0a12f7d3ddad1d0fa7a216524212f777d722a3f7
Author: Chen, Hao <[email protected]>
Date: 2016-09-07T17:40:31Z
Support siddhi extension and registery
commit 9dc425960b212e426e50683778aa3939492669e2
Author: Chen, Hao <[email protected]>
Date: 2016-09-07T18:02:55Z
Fix import checkstyle
commit e618bb2774bd0d6af77d11f65c2200cf18253d1f
Author: Chen, Hao <[email protected]>
Date: 2016-09-08T15:29:45Z
Refactor registerExtension interface
commit 13d76b6c16ca538ff8113352310f9568035eb92a
Author: Chen, Hao <[email protected]>
Date: 2016-09-08T17:55:41Z
Decouple SiddhiCEP API to environment instance isolation level
commit 26498dee5fefd6520701f6ee23441968642656a8
Author: Chen, Hao <[email protected]>
Date: 2016-09-08T18:00:19Z
Add unit test testTriggerUndefinedStreamException
commit 23dabddf1f20665161861c6757c48c5ae0b0bb08
Author: Chen, Hao <[email protected]>
Date: 2016-09-08T18:22:18Z
Decouple data stream converter with streamId
commit 8bbf6f3417e0c177638301c18379c8aadc3746ee
Author: Chen, Hao <[email protected]>
Date: 2016-09-09T08:16:51Z
Sorting buffer queue state persistence and restore logic, support return
Map<String,Object>
commit fcb4a2c85abd604965f2e375d767f6dbd2711222
Author: Chen, Hao <[email protected]>
Date: 2016-09-09T09:28:24Z
Fix checkstyle error
commit 466aafca86899c89b7b6e6d25f844880f9513c51
Author: Chen, Hao <[email protected]>
Date: 2016-09-09T17:34:11Z
Reuse StreamRecord object while collecting output
commit f7f9289557b1da3304ea3d43f7d2441ff314a755
Author: Chen, Hao <[email protected]>
Date: 2016-09-09T18:34:30Z
Add flink-sddhi doc in package-info.java
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---