[FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition

Implements NFA using the SharedBuffer

Implements NFACompiler to compile a Pattern into a NFA

Add CEP operator

Makes NFA and SharedBuffer serializable

Add serializability support to SharedBuffer and NFA

Add keyed cep pattern operator

Adds CEP documentation

Adds online documentation for the CEP library

Copies sequence events before giving them to the UDF

Fix correct scala type suffixes

This closes #1557.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79058edb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79058edb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79058edb

Branch: refs/heads/master
Commit: 79058edb67095120558add534ba37304425fa602
Parents: 682d8d5
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Jan 14 10:04:23 2016 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Feb 2 15:04:08 2016 +0100

----------------------------------------------------------------------
 docs/libs/cep/index.md                          | 300 +++++++
 flink-libraries/flink-cep/pom.xml               |  77 ++
 .../src/main/java/org/apache/flink/cep/CEP.java | 100 +++
 .../flink/cep/NonDuplicatingTypeSerializer.java | 195 +++++
 .../flink/cep/PatternFlatSelectFunction.java    |  54 ++
 .../apache/flink/cep/PatternSelectFunction.java |  54 ++
 .../org/apache/flink/cep/PatternStream.java     | 151 ++++
 .../apache/flink/cep/nfa/ComputationState.java  |  84 ++
 .../org/apache/flink/cep/nfa/DeweyNumber.java   | 163 ++++
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 406 +++++++++
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 858 +++++++++++++++++++
 .../java/org/apache/flink/cep/nfa/State.java    | 109 +++
 .../apache/flink/cep/nfa/StateTransition.java   |  84 ++
 .../flink/cep/nfa/StateTransitionAction.java    |  28 +
 .../flink/cep/nfa/compiler/NFACompiler.java     | 187 ++++
 .../operator/AbstractCEPPatternOperator.java    | 108 +++
 .../flink/cep/operator/CEPPatternOperator.java  | 137 +++
 .../cep/operator/KeyedCEPPatternOperator.java   | 331 +++++++
 .../cep/operator/StreamRecordComparator.java    |  44 +
 .../flink/cep/pattern/AndFilterFunction.java    |  44 +
 .../flink/cep/pattern/FollowedByPattern.java    |  33 +
 .../org/apache/flink/cep/pattern/Pattern.java   | 168 ++++
 .../cep/pattern/SubtypeFilterFunction.java      |  43 +
 .../java/org/apache/flink/cep/CEPITCase.java    | 406 +++++++++
 .../test/java/org/apache/flink/cep/Event.java   |  77 ++
 .../java/org/apache/flink/cep/StreamEvent.java  |  41 +
 .../java/org/apache/flink/cep/SubEvent.java     |  49 ++
 .../apache/flink/cep/nfa/DeweyNumberTest.java   |  54 ++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 160 ++++
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 261 ++++++
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 136 +++
 .../flink/cep/nfa/compiler/NFACompilerTest.java | 129 +++
 .../apache/flink/cep/pattern/PatternTest.java   | 145 ++++
 flink-libraries/pom.xml                         |   1 +
 .../streaming/api/operators/StreamOperator.java |   2 +-
 35 files changed, 5218 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/docs/libs/cep/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/cep/index.md b/docs/libs/cep/index.md
new file mode 100644
index 0000000..04e2b73
--- /dev/null
+++ b/docs/libs/cep/index.md
@@ -0,0 +1,300 @@
+---
+title: "FlinkCEP - Complex event processing for Flink"
+# Top navigation
+top-nav-group: libs
+top-nav-pos: 2
+top-nav-title: CEP
+# Sub navigation
+sub-nav-group: batch
+sub-nav-id: flinkcep
+sub-nav-pos: 2
+sub-nav-parent: libs
+sub-nav-title: CEP
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+FlinkCEP is the complex event processing library for Flink.
+It allows you to easily detect complex event patterns in a stream of endless 
data.
+Complex events can then be constructed from matching sequences.
+This gives you the opportunity to quickly get hold of what's really important 
in your data.
+
+## Getting Started
+
+If you want to jump right in, you have to [set up a Flink program]({{ 
site.baseurl }}/apis/batch/index.html#linking-with-flink).
+Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-cep{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Note that FlinkCEP is currently not part of the binary distribution.
+See linking with it for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+Now you can start writing your first CEP program using the pattern API.
+
+{% highlight java %}
+DataStream<Event> input = ...
+
+Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 
42)
+    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() 
>= 10.0)
+    .followedBy("end").where(evt -> evt.getName().equals("end"));
+    
+PatternStream<Event> patternStream = CEP.from(input, pattern);
+
+DataStream<Alert> result = patternStream.select(pattern -> {
+    return createAlertFrom(pattern);
+});
+{% endhighlight %}
+
+Note that we have used Java 8 lambdas here to make the example more succinct.
+
+## The Pattern API
+
+The pattern API allows you to quickly define complex event patterns.
+
+Each pattern consists of multiple stages or what we call states.
+In order to go from one state to the next, the user can specify conditions.
+These conditions can be the contiguity of events or a filter condition on an 
event.
+
+Each pattern has to start with an initial state:
+
+{% highlight java %}
+Pattern<Event, ?> start = Pattern.<Event>begin("start");
+{% endhighlight %}
+
+Each state must have an unique name to identify the matched events later on.
+Additionally, we can specify a filter condition for the event to be accepted 
as the start event via the `where` method.
+  
+{% highlight java %}
+start.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+
+We can also restrict the type of the accepted event to some subtype of the 
initial event type (here `Event`) via the `subtype` method.
+
+{% highlight java %}
+start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {
+    @Override
+    public boolean filter(SubEvent value) {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+
+As it can be seen here, the subtype condition can also be combined with an 
additional filter condition on the subtype.
+In fact you can always provide multiple conditions by calling `where` and 
`subtype` multiple times.
+These conditions will then be combined using the logical AND operator.
+
+Next, we can append further states to detect complex patterns.
+We can control the contiguity of two succeeding events to be accepted by the 
pattern.
+
+Strict contiguity means that two matching events have to succeed directly.
+This means that no other events can occur in between.
+A strict contiguity pattern state can be created via the `next` method.
+
+{% highlight java %}
+Pattern<Event, ?> strictNext = start.next("middle");
+{% endhighlight %}
+
+Non-strict contiguity means that other events are allowed to occur in-between 
two matching events.
+A non-strict contiguity pattern state can be created via the `followedBy` 
method.
+
+It is also possible to define a temporal constraint for the pattern to be 
valid.
+For example, one can define that a pattern should occur within 10 seconds via 
the `within` method.
+
+{% highlight java %}
+next.within(Time.seconds(10));
+{% endhighlight %}
+
+{% highlight java %}
+Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
+{% endhighlight %}
+
+<br />
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Pattern Operation</th>
+            <th class="text-center">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><strong>Begin</strong></td>
+            <td>
+            <p>Defines a starting pattern state:</p>
+    {% highlight java %}
+    Pattern<Event, ?> start = Pattern.<Event>begin("start");
+    {% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Next</strong></td>
+            <td>
+                <p>Appends a new pattern state. A matching event has to 
directly succeed the previous matching event:</p>
+{% highlight java %}
+Pattern<Event, ?> next = start.next("next");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>FollowedBy</strong></td>
+            <td>
+                <p>Appends a new pattern state. Other events can occur between 
a matching event and the previous matching event:</p>
+{% highlight java %}
+Pattern<Event, ?> next = start.followedBy("next");
+{% endhighlight %}
+            </td>
+        </tr>
+        <tr>
+            <td><strong>Where</strong></td>
+            <td>
+                <p>Defines a filter condition for the current pattern state. 
Only if an event passes the filter, it can match the state:</p>
+{% highlight java %}
+patternState.where(new FilterFunction<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return ... // some condition
+    }
+});
+{% endhighlight %}
+            </td>
+        </tr>
+       <tr>
+           <td><strong>Subtype</strong></td>
+           <td>
+               <p>Defines a subtype condition for the current pattern state. 
Only if an event is of this subtype, it can match the state:</p>
+{% highlight java %}
+patternState.subtype(SubEvent.class);
+{% endhighlight %}
+           </td>
+       </tr>
+       <tr>
+          <td><strong>Within</strong></td>
+          <td>
+              <p>Defines the maximum time interval for an event sequence to 
match the pattern. If a non-completed event sequence exceeds this time, it is 
discarded:</p>
+{% highlight java %}
+patternState.within(Time.seconds(10));
+{% endhighlight %}
+          </td>
+      </tr>
+  </tbody>
+</table>
+
+### Detecting Patterns
+
+In order to run a stream of events against your pattern, you have to create a 
`PatternStream`.
+Given an input stream `input` and a pattern `pattern`, you create the 
`PatternStream` by calling
+
+{% highlight java %}
+DataStream<Event> input = ...
+Pattern<Event, ?> pattern = ...
+
+PatternStream<Event> patternStream = CEP.from(input, pattern);
+{% endhighlight %}
+
+### Selecting from Patterns
+
+Once you have obtained a `PatternStream` you can select from detected event 
sequences via the `select` or `flatSelect` methods.
+The `select` method requires a `PatternSelectFunction` implementation.
+A `PatternSelectFunction` has a `select` method which is called for each 
matching event sequence.
+It receives a map of string/event pairs of the matched events.
+The string is defined by the name of the state to which the event has been 
matched.
+The `select` method can return exactly one result.
+
+{% highlight java %}
+class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, 
OUT> {
+    @Override
+    public OUT select(Map<String, IN> pattern) {
+        IN startEvent = pattern.get("start");
+        IN endEvent = pattern.get("end");
+        
+        return new OUT(startEvent, endEvent);
+    }
+}
+{% endhighlight %}
+
+A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with 
the only distinction that it can return an arbitrary number of results.
+In order to do this, the `select` method has an additional `Collector` 
parameter which is used for the element output.
+
+{% highlight java %}
+class MyPatternFlatSelectFunction<IN, OUT> implements 
PatternFlatSelectFunction<IN, OUT> {
+    @Override
+    public void select(Map<String, IN> pattern, Collector<OUT> collector) {
+        IN startEvent = pattern.get("start");
+        IN endEvent = pattern.get("end");
+        
+        for (int i = 0; i < startEvent.getValue(); i++ ) {
+            collector.collect(new OUT(startEvent, endEvent));
+        }
+    }
+}
+{% endhighlight %}
+
+## Examples
+
+The following example detects the pattern `start, middle(name = "error") -> 
end(name = "critical")` on a keyed data stream of `Events`.
+The events are keyed by their ids and a valid pattern has to occur within 10 
seconds.
+The whole processing is done with event time.
+
+{% highlight java %}
+StreamExecutionEnvironment env = ...
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+DataStream<Event> input = ...
+
+DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, 
Integer>() {
+       @Override
+       public Integer getKey(Event value) throws Exception {
+               return value.getId();
+       }
+});
+
+Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
+       .next("middle").where(new FilterFunction<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("name");
+               }
+       }).followedBy("end").where(new FilterFunction<Event>() {
+               @Override
+               public boolean filter(Event value) throws Exception {
+                       return value.getName().equals("critical");
+               }
+       }).within(Time.seconds(10));
+
+PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern);
+
+DataStream<Alert> alerts = patternStream.select(new 
PatternSelectFunction<Event, Alert>() {
+       @Override
+       public Alert select(Map<String, Event> pattern) throws Exception {
+               return new Alert(pattern.get("start"), pattern.get("end"))
+       }
+});
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml 
b/flink-libraries/flink-cep/pom.xml
new file mode 100644
index 0000000..c246077
--- /dev/null
+++ b/flink-libraries/flink-cep/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-libraries</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       
+       <artifactId>flink-cep</artifactId>
+       <name>flink-cep</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <packaging>jar</packaging>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
new file mode 100644
index 0000000..60e0bf8
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CEPPatternOperator;
+import org.apache.flink.cep.operator.KeyedCEPPatternOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+
+import java.util.Map;
+
+/**
+ * Utility class for complex event processing.
+ *
+ * Methods which transform a {@link DataStream} into a {@link PatternStream} 
to do CEP.
+ */
+public class CEP {
+       private static final String PATTERN_OPERATOR_NAME = 
"AbstractCEPPatternOperator";
+
+       /**
+        * Transforms a {@link DataStream<T>} into a {@link PatternStream<T>}. 
The PatternStream detects
+        * the provided event pattern and emits the patterns as a {@link 
Map<String, T>} where each event
+        * is identified by a String. The String is the name of the {@link 
State <T>} to which the event
+        * has been associated.
+        *
+        * Depending on the input {@link DataStream<T>} type, keyed vs. 
non-keyed, a different
+        * {@link org.apache.flink.cep.operator.AbstractCEPPatternOperator<T>} 
is instantiated.
+        *
+        * @param input DataStream containing the input events
+        * @param pattern Pattern specification which shall be detected
+        * @param <T> Type of the input events
+        * @param <K> Type of the key in case of a KeyedStream (necessary to 
bind keySelector and
+        *            keySerializer to the same type)
+        * @return Resulting pattern stream
+        */
+       public static <T, K> PatternStream<T> pattern(DataStream<T> input, 
Pattern<T, ?> pattern) {
+               final TypeSerializer<T> inputSerializer = 
input.getType().createSerializer(input.getExecutionConfig());
+
+               // check whether we use processing time
+               final boolean isProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
+
+               // compile our pattern into a NFAFactory to instantiate NFAs 
later on
+               final NFACompiler.NFAFactory<T> nfaFactory = 
NFACompiler.compileFactory(pattern, inputSerializer);
+
+               final DataStream<Map<String, T>> patternStream;
+
+               if (input instanceof KeyedStream) {
+                       // We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
+                       KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) 
input;
+
+                       KeySelector<T, K> keySelector = 
keyedStream.getKeySelector();
+                       TypeSerializer<K> keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
+
+                       patternStream = keyedStream.transform(
+                               PATTERN_OPERATOR_NAME,
+                               (TypeInformation<Map<String, T>>) 
(TypeInformation<?>) TypeExtractor.getForClass(Map.class),
+                               new KeyedCEPPatternOperator<>(
+                                       inputSerializer,
+                                       isProcessingTime,
+                                       keySelector,
+                                       keySerializer,
+                                       nfaFactory));
+               } else {
+                       patternStream = input.transform(
+                               PATTERN_OPERATOR_NAME,
+                               (TypeInformation<Map<String, T>>) 
(TypeInformation<?>) TypeExtractor.getForClass(Map.class),
+                               new CEPPatternOperator<T>(
+                                       inputSerializer,
+                                       isProcessingTime,
+                                       nfaFactory
+                               )).setParallelism(1);
+               }
+
+               return new PatternStream<>(patternStream, input.getType());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
new file mode 100644
index 0000000..846b6c3
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+
+/**
+ * Type serializer which keeps track of the serialized objects so that each 
object is only
+ * serialized once. If the same object shall be serialized again, then a 
reference handle is
+ * written instead.
+ *
+ * Avoiding duplication is achieved by keeping an internal identity hash map. 
This map contains
+ * all serialized objects. To make the serializer work it is important that 
the same serializer
+ * is used for a coherent serialization run. After the serialization has 
stopped, the identity
+ * hash map should be cleared.
+ *
+ * @param <T> Type of the element to be serialized
+ */
+public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
+       private static final long serialVersionUID = -7633631762221447524L;
+
+       // underlying type serializer
+       private final TypeSerializer<T> typeSerializer;
+
+       // here we store the already serialized objects
+       private transient IdentityHashMap<T, Integer> identityMap;
+
+       // here we store the already deserialized objects
+       private transient ArrayList<T> elementList;
+
+       public NonDuplicatingTypeSerializer(final TypeSerializer<T> 
typeSerializer) {
+               this.typeSerializer = typeSerializer;
+
+               this.identityMap = new IdentityHashMap<>();
+               this.elementList = new ArrayList<>();
+       }
+
+       public TypeSerializer<T> getTypeSerializer() {
+               return typeSerializer;
+       }
+
+       /**
+        * Clears the data structures containing the already 
serialized/deserialized objects. This
+        * effectively resets the type serializer.
+        */
+       public void clearReferences() {
+               identityMap.clear();
+               elementList.clear();
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return typeSerializer.isImmutableType();
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               return new NonDuplicatingTypeSerializer<>(typeSerializer);
+       }
+
+       @Override
+       public T createInstance() {
+               return typeSerializer.createInstance();
+       }
+
+       @Override
+       public T copy(T from) {
+               return typeSerializer.copy(from);
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               return typeSerializer.copy(from, reuse);
+       }
+
+       @Override
+       public int getLength() {
+               return typeSerializer.getLength();
+       }
+
+       /**
+        * Serializes the given record.
+        * <p>
+        * First a boolean indicating whether a reference handle (true) or the 
object (false) is
+        * written. Then, either the reference handle or the object is written.
+        *
+        * @param record The record to serialize.
+        * @param target The output view to write the serialized data to.
+        *
+        * @throws IOException
+        */
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               if (identityMap.containsKey(record)) {
+                       target.writeBoolean(true);
+                       target.writeInt(identityMap.get(record));
+               } else {
+                       target.writeBoolean(false);
+                       typeSerializer.serialize(record, target);
+               }
+       }
+
+       /**
+        * Deserializes an object from the input view.
+        * <p>
+        * First it reads a boolean indicating whether a reference handle or a 
serialized object
+        * follows.
+        *
+        * @param source The input view from which to read the data.
+        * @return The deserialized object
+        * @throws IOException
+        */
+       public T deserialize(DataInputView source) throws IOException {
+               boolean alreadyRead = source.readBoolean();
+
+               if (alreadyRead) {
+                       int index = source.readInt();
+                       return elementList.get(index);
+               } else {
+                       T element = typeSerializer.deserialize(source);
+                       elementList.add(element);
+
+                       return element;
+               }
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               boolean alreadyRead = source.readBoolean();
+
+               if (alreadyRead) {
+                       int index = source.readInt();
+                       typeSerializer.serialize(elementList.get(index), 
target);
+               } else {
+                       T element = typeSerializer.deserialize(source);
+                       typeSerializer.serialize(element, target);
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof NonDuplicatingTypeSerializer) {
+                       @SuppressWarnings("unchecked")
+                       NonDuplicatingTypeSerializer<T> other = 
(NonDuplicatingTypeSerializer<T>)obj;
+
+                       return (other.canEqual(this) && 
typeSerializer.equals(other.typeSerializer));
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof NonDuplicatingTypeSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return typeSerializer.hashCode();
+       }
+
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               ois.defaultReadObject();
+
+               this.identityMap = new IdentityHashMap<>();
+               this.elementList = new ArrayList<>();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java
new file mode 100644
index 0000000..bfbbc23
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Base interface for a pattern select function which can produce multiple 
resulting elements. A
+ * pattern flat select function is called with a map of detected events which 
are identified by
+ * their names. The names are defined by the {@link 
org.apache.flink.cep.pattern.Pattern} specifying
+ * the sought-after pattern. Additionally, a collector is provided as a 
parameter. The collector
+ * is used to emit an arbitrary number of resulting elements.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...
+ *
+ * DataStream<OUT> result = pattern.flatSelect(new 
MyPatternFlatSelectFunction());
+ * }</pre>
+ * @param <IN>
+ * @param <OUT>
+ */
+public interface PatternFlatSelectFunction<IN, OUT> extends Function, 
Serializable {
+
+       /**
+        * Generates zero or more resulting elements given a map of detected 
pattern events. The events
+        * are identified by their specified names.
+        *
+        * @param pattern Map containing the found pattern. Events are 
identified by their names.
+        * @param out Collector used to output the generated elements
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the
+        *                                       operation to fail and may 
trigger recovery.
+        */
+       void flatSelect(Map<String, IN> pattern, Collector<OUT> out) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java
new file mode 100644
index 0000000..c403529
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Base interface for a pattern select function. A pattern select function is 
called with a
+ * map containing the detected events which can be accessed by their names. 
The names depend on
+ * the definition of the {@link org.apache.flink.cep.pattern.Pattern}. The 
select method returns
+ * exactly one result. If you want to return more than one result, then you 
have to implement
+ * a {@link PatternFlatSelectFunction}.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...;
+ *
+ * DataStream<OUT> result = pattern.select(new MyPatternSelectFunction());
+ *}</pre>
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public interface PatternSelectFunction<IN, OUT> extends Function, Serializable 
{
+
+       /**
+        * Generates a result from the given map of events. The events are 
identified by their names.
+        * Only one resulting element can be generated.
+        *
+        * @param pattern Map containing the found pattern. Events are 
identified by their names
+        * @return Resulting element
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the
+        *                                       operation to fail and may 
trigger recovery.
+        */
+       OUT select(Map<String, IN> pattern) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
new file mode 100644
index 0000000..63ed3b4
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.util.Collector;
+
+import java.util.Map;
+
+/**
+ * Stream abstraction for CEP pattern detection. A pattern stream is a stream 
which emits detected
+ * pattern sequences as a map of events associated with their names. The 
pattern is detected using a
+ * {@link org.apache.flink.cep.nfa.NFA}. In order to process the detected 
sequences, the user
+ * has to specify a {@link PatternSelectFunction} or a {@link 
PatternFlatSelectFunction}.
+ *
+ * @param <T> Type of the events
+ */
+public class PatternStream<T> {
+
+       // underlying data stream
+       private final DataStream<Map<String, T>> patternStream;
+       // type information of input type T
+       private final TypeInformation<T> inputType;
+
+       PatternStream(final DataStream<Map<String, T>> patternStream, final 
TypeInformation<T> inputType) {
+               this.patternStream = patternStream;
+               this.inputType = inputType;
+       }
+
+       /**
+        * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
+        * provided {@link PatternSelectFunction} is called. The pattern select 
function can produce
+        * exactly one resulting element.
+        *
+        * @param patternSelectFunction The pattern select function which is 
called for each detected
+        *                              pattern sequence.
+        * @param <R> Type of the resulting elements
+        * @return {@link DataStream} which contains the resulting elements 
from the pattern select
+        *         function.
+        */
+       public <R> DataStream<R> select(final PatternSelectFunction<T, R> 
patternSelectFunction) {
+               // we have to extract the output type from the provided pattern 
selection function manually
+               // because the TypeExtractor cannot do that if the method is 
wrapped in a MapFunction
+               TypeInformation<R> outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       patternSelectFunction,
+                       PatternSelectFunction.class,
+                       false,
+                       false,
+                       inputType,
+                       null,
+                       false);
+
+               return patternStream.map(
+                       new PatternSelectMapper<T, R>(
+                               
patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
+                       .returns(outTypeInfo);
+       }
+
+       /**
+        * Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+        * the provided {@link PatternFlatSelectFunction} is called. The 
pattern flat select function
+        * can produce an arbitrary number of resulting elements.
+        *
+        * @param patternFlatSelectFunction The pattern flat select function 
which is called for each
+        *                                  detected pattern sequence.
+        * @param <R> Typ of the resulting elements
+        * @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
+        *         function.
+        */
+       public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, 
R> patternFlatSelectFunction) {
+               // we have to extract the output type from the provided pattern 
selection function manually
+               // because the TypeExtractor cannot do that if the method is 
wrapped in a MapFunction
+               TypeInformation<R> outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
+                       patternFlatSelectFunction,
+                       PatternFlatSelectFunction.class,
+                       false,
+                       false,
+                       inputType,
+                       null,
+                       false);
+
+               return patternStream.flatMap(
+                       new PatternFlatSelectMapper<T, R>(
+                               
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
+                       )).returns(outTypeInfo);
+       }
+
+       /**
+        * Wrapper for a {@link PatternSelectFunction}.
+        *
+        * @param <T> Type of the input elements
+        * @param <R> Type of the resulting elements
+        */
+       private static class PatternSelectMapper<T, R> implements 
MapFunction<Map<String, T>, R> {
+               private static final long serialVersionUID = 
2273300432692943064L;
+
+               private final PatternSelectFunction<T, R> patternSelectFunction;
+
+               public PatternSelectMapper(PatternSelectFunction<T, R> 
patternSelectFunction) {
+                       this.patternSelectFunction = patternSelectFunction;
+               }
+
+               @Override
+               public R map(Map<String, T> value) throws Exception {
+                       return patternSelectFunction.select(value);
+               }
+       }
+
+       /**
+        * Wrapper for a {@link PatternFlatSelectFunction}.
+        *
+        * @param <T> Type of the input elements
+        * @param <R> Type of the resulting elements
+        */
+       private static class PatternFlatSelectMapper<T, R> implements 
FlatMapFunction<Map<String, T>, R> {
+
+               private static final long serialVersionUID = 
-8610796233077989108L;
+
+               private final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction;
+
+               public PatternFlatSelectMapper(PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
+                       this.patternFlatSelectFunction = 
patternFlatSelectFunction;
+               }
+
+
+               @Override
+               public void flatMap(Map<String, T> value, Collector<R> out) 
throws Exception {
+                       patternFlatSelectFunction.flatSelect(value, out);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
new file mode 100644
index 0000000..3f44fba
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+/**
+ * Helper class which encapsulates the state of the NFA computation. It points 
to the current state,
+ * the last taken event, its occurrence timestamp, the current version and the 
starting timestamp
+ * of the overall pattern.
+ *
+ * @param <T> Type of the input events
+ */
+public class ComputationState<T> {
+       // pointer to the NFA state of the computation
+       private final State<T> state;
+
+       // the last taken event
+       private final T event;
+
+       // timestamp of the last taken event
+       private final long timestamp;
+
+       // The current version of the state to discriminate the valid pattern 
paths in the SharedBuffer
+       private final DeweyNumber version;
+
+       // Timestamp of the first element in the pattern
+       private final long startTimestamp;
+
+       public ComputationState(
+               final State<T> currentState,
+               final T event,
+               final long timestamp,
+               final DeweyNumber version,
+               final long startTimestamp) {
+               this.state = currentState;
+               this.event = event;
+               this.timestamp = timestamp;
+               this.version = version;
+               this.startTimestamp = startTimestamp;
+       }
+
+       public boolean isFinalState() {
+               return state.isFinal();
+       }
+
+       public boolean isStartState() {
+               return state.isStart();
+       }
+
+       public long getTimestamp() {
+               return timestamp;
+       }
+
+       public long getStartTimestamp() {
+               return startTimestamp;
+       }
+
+       public State<T> getState() {
+               return state;
+       }
+
+       public T getEvent() {
+               return event;
+       }
+
+       public DeweyNumber getVersion() {
+               return version;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
new file mode 100644
index 0000000..bb9039d
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Versioning scheme which allows to retrieve dependencies between different 
versions.
+ *
+ * A dewey number consists of a sequence of digits d1.d2.d3. ... .dn. A dewey 
number v is compatible
+ * to v' iff v contains v' as a prefix or if both dewey number differ only in 
the last digit and
+ * the last digit of v is greater than v'.
+ *
+ */
+public class DeweyNumber implements Serializable {
+
+       private static final long serialVersionUID = 6170434818252267825L;
+
+       // sequence of digits
+       private final int[] deweyNumber;
+
+       public DeweyNumber(int start) {
+               deweyNumber = new int[]{start};
+       }
+
+       protected DeweyNumber(int[] deweyNumber) {
+               this.deweyNumber = deweyNumber;
+       }
+
+       /**
+        * Checks whether this dewey number is compatible to the other dewey 
number.
+        *
+        * True iff this contains other as a prefix or iff they differ only in 
the last digit whereas
+        * the last digit of this is greater than the last digit of other.
+        *
+        * @param other The other dewey number to check compatibility against
+        * @return Whether this dewey number is compatible to the other dewey 
number
+        */
+       public boolean isCompatibleWith(DeweyNumber other) {
+               if (length() > other.length()) {
+                       // prefix case
+                       for (int i = 0; i < other.length(); i++) {
+                               if (other.deweyNumber[i] != deweyNumber[i]) {
+                                       return false;
+                               }
+                       }
+
+                       return true;
+               } else if (length() == other.length()) {
+                       // check init digits for equality
+                       int lastIndex = length() - 1;
+                       for (int i = 0; i < lastIndex; i++) {
+                               if (other.deweyNumber[i] != deweyNumber[i]) {
+                                       return false;
+                               }
+                       }
+
+                       // check that the last digit is greater or equal
+                       return deweyNumber[lastIndex] >= 
other.deweyNumber[lastIndex];
+               } else {
+                       return false;
+               }
+       }
+
+       public int length() {
+               return deweyNumber.length;
+       }
+
+       /**
+        * Creates a new dewey number from this such that its last digit is 
increased by
+        * one.
+        *
+        * @return A new dewey number derived from this whose last digit is 
increased by one
+        */
+       public DeweyNumber increase() {
+               int[] newDeweyNumber = Arrays.copyOf(deweyNumber, 
deweyNumber.length);
+               newDeweyNumber[deweyNumber.length - 1]++;
+
+               return new DeweyNumber(newDeweyNumber);
+       }
+
+       /**
+        * Creates a new dewey number from this such that a 0 is appended as 
new last digit.
+        *
+        * @return A new dewey number which contains this as a prefix and has 0 
as last digit
+        */
+       public DeweyNumber addStage() {
+               int[] newDeweyNumber = Arrays.copyOf(deweyNumber, 
deweyNumber.length + 1);
+
+               return new DeweyNumber(newDeweyNumber);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof DeweyNumber) {
+                       DeweyNumber other = (DeweyNumber) obj;
+
+                       return Arrays.equals(deweyNumber, other.deweyNumber);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(deweyNumber);
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder builder = new StringBuilder();
+
+               for (int i = 0; i < length() - 1; i++) {
+                       builder.append(deweyNumber[i]).append(".");
+               }
+
+               if (length() > 0) {
+                       builder.append(deweyNumber[length() - 1]);
+               }
+
+               return builder.toString();
+       }
+
+       /**
+        * Creates a dewey number from a string representation. The input 
string must be a dot separated
+        * string of integers.
+        *
+        * @param deweyNumberString Dot separated string of integers
+        * @return Dewey number generated from the given input string
+        */
+       public static DeweyNumber fromString(final String deweyNumberString) {
+               String[] splits = deweyNumberString.split("\\.");
+
+               if (splits.length == 0) {
+                       return new 
DeweyNumber(Integer.parseInt(deweyNumberString));
+               } else {
+                       int[] deweyNumber = new int[splits.length];
+
+                       for (int i = 0; i < splits.length; i++) {
+                               deweyNumber[i] = Integer.parseInt(splits[i]);
+                       }
+
+                       return new DeweyNumber(deweyNumber);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
new file mode 100644
index 0000000..a68d6eb
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Stack;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Non-deterministic finite automaton implementation.
+ * <p>
+ * The NFA processes input events which will chnage the internal state 
machine. Whenever a final
+ * state is reached, the matching sequence of events is emitted.
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @see <a 
href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf";>https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ *
+ * @param <T> Type of the processed events
+ */
+public class NFA<T> implements Serializable {
+
+       private static final Pattern namePattern = 
Pattern.compile("^(.*\\[)(\\])$");
+       private static final long serialVersionUID = 2957674889294717265L;
+
+       private final NonDuplicatingTypeSerializer<T> 
nonDuplicatingTypeSerializer;
+
+       // Buffer used to store the matched events
+       private final SharedBuffer<State<T>, T> sharedBuffer;
+
+       // Set of all NFA states
+       private final Set<State<T>> states;
+
+       // Length of the window
+       private final long windowTime;
+
+       // Current starting index for the next dewey version number
+       private int startEventCounter;
+
+       // Current set of computation states within the state machine
+       private transient Queue<ComputationState<T>> computationStates;
+
+       public NFA(final TypeSerializer<T> eventSerializer, final long 
windowTime) {
+               this.nonDuplicatingTypeSerializer = new 
NonDuplicatingTypeSerializer<>(eventSerializer);
+               this.windowTime = windowTime;
+               sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+               computationStates = new LinkedList<>();
+
+               states = new HashSet<>();
+               startEventCounter = 1;
+       }
+
+       public Set<State<T>> getStates() {
+               return states;
+       }
+
+       public void addStates(final Collection<State<T>> newStates) {
+               for (State<T> state: newStates) {
+                       addState(state);
+               }
+       }
+
+       public void addState(final State<T> state) {
+               states.add(state);
+
+               if (state.isStart()) {
+                       computationStates.add(new ComputationState<>(state, 
null, -1L, null, -1L));
+               }
+       }
+
+       /**
+        * Processes the next input event. If some of the computations reach a 
final state then the
+        * resulting event sequences are returned.
+        *
+        * @param event The current event to be processed
+        * @param timestamp The timestamp of the current event
+        * @return The collection of matched patterns (e.g. the result of 
computations which have
+        * reached a final state)
+        */
+       public Collection<Map<String, T>> process(final T event, final long 
timestamp) {
+               final int numberComputationStates = computationStates.size();
+               final List<Map<String, T>> result = new ArrayList<>();
+
+               // iterate over all current computations
+               for (int i = 0; i < numberComputationStates; i++) {
+                       ComputationState<T> computationState = 
computationStates.poll();
+
+                       final Collection<ComputationState<T>> 
newComputationStates;
+
+                       if (!computationState.isStartState() &&
+                               windowTime > 0 &&
+                               timestamp - 
computationState.getStartTimestamp() > windowTime) {
+                               // remove computation state which has exceeded 
the window length
+                               
sharedBuffer.release(computationState.getState(), computationState.getEvent(), 
computationState.getTimestamp());
+                               
sharedBuffer.remove(computationState.getState(), computationState.getEvent(), 
computationState.getTimestamp());
+
+                               newComputationStates = Collections.emptyList();
+                       } else {
+                               newComputationStates = 
computeNextStates(computationState, event, timestamp);
+                       }
+
+                       for (ComputationState<T> newComputationState: 
newComputationStates) {
+                               if (newComputationState.isFinalState()) {
+                                       // we've reached a final state and can 
thus retrieve the matching event sequence
+                                       Collection<Map<String, T>> matches = 
extractPatternMatches(newComputationState);
+                                       result.addAll(matches);
+
+                                       // remove found patterns because they 
are no longer needed
+                                       
sharedBuffer.release(newComputationState.getState(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
+                                       
sharedBuffer.remove(newComputationState.getState(), 
newComputationState.getEvent(), newComputationState.getTimestamp());
+                               } else {
+                                       // add new computation state; it will 
be processed once the next event arrives
+                                       
computationStates.add(newComputationState);
+                               }
+                       }
+
+                       // prune shared buffer based on window length
+                       if(windowTime > 0) {
+                               long pruningTimestamp = timestamp - windowTime;
+
+                               // remove all elements which are expired with 
respect to the window length
+                               sharedBuffer.prune(pruningTimestamp);
+                       }
+               }
+
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof NFA) {
+                       @SuppressWarnings("unchecked")
+                       NFA<T> other = (NFA<T>) obj;
+
+                       return 
nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
+                               sharedBuffer.equals(other.sharedBuffer) &&
+                               states.equals(other.states) &&
+                               windowTime == other.windowTime &&
+                               startEventCounter == other.startEventCounter;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, 
states, windowTime, startEventCounter);
+       }
+
+       /**
+        * Computes the next computation states based on the given computation 
state, the current event,
+        * its timestamp and the internal state machine.
+        *
+        * @param computationState Current computation state
+        * @param event Current event which is processed
+        * @param timestamp Timestamp of the current event
+        * @return Collection of computation states which result from the 
current one
+        */
+       private Collection<ComputationState<T>> computeNextStates(
+                       final ComputationState<T> computationState,
+                       final T event,
+                       final long timestamp) {
+               Stack<State<T>> states = new Stack<>();
+               ArrayList<ComputationState<T>> resultingComputationStates = new 
ArrayList<>();
+               State<T> state = computationState.getState();
+
+               states.push(state);
+
+               while (!states.isEmpty()) {
+                       State<T> currentState = states.pop();
+                       Collection<StateTransition<T>> stateTransitions = 
currentState.getStateTransitions();
+
+                       // check all state transitions for each state
+                       for (StateTransition<T> stateTransition: 
stateTransitions) {
+                               try {
+                                       if (stateTransition.getCondition() == 
null || stateTransition.getCondition().filter(event)) {
+                                               // filter condition is true
+                                               switch 
(stateTransition.getAction()) {
+                                                       case PROCEED:
+                                                               // simply 
advance the computation state, but apply the current event to it
+                                                               // PROCEED is 
equivalent to an epsilon transition
+                                                               
states.push(stateTransition.getTargetState());
+                                                               break;
+                                                       case IGNORE:
+                                                               
resultingComputationStates.add(computationState);
+
+                                                               // we have a 
new computation state referring to the same the shared entry
+                                                               // the lock of 
the current computation is released later on
+                                                               
sharedBuffer.lock(computationState.getState(), computationState.getEvent(), 
computationState.getTimestamp());
+                                                               break;
+                                                       case TAKE:
+                                                               final State<T> 
newState = stateTransition.getTargetState();
+                                                               final 
DeweyNumber newVersion;
+                                                               final State<T> 
previousState = computationState.getState();
+                                                               final T 
previousEvent = computationState.getEvent();
+                                                               final long 
previousTimestamp;
+                                                               final long 
startTimestamp;
+
+                                                               if 
(computationState.isStartState()) {
+                                                                       
newVersion = new DeweyNumber(startEventCounter++);
+                                                                       
startTimestamp = timestamp;
+                                                                       
previousTimestamp = -1L;
+
+                                                               } else {
+                                                                       
startTimestamp = computationState.getStartTimestamp();
+                                                                       
previousTimestamp = computationState.getTimestamp();
+
+                                                                       if 
(newState.equals(computationState.getState())) {
+                                                                               
newVersion = computationState.getVersion().increase();
+                                                                       } else {
+                                                                               
newVersion = computationState.getVersion().addStage();
+                                                                       }
+                                                               }
+
+                                                               
sharedBuffer.put(
+                                                                       
newState,
+                                                                       event,
+                                                                       
timestamp,
+                                                                       
previousState,
+                                                                       
previousEvent,
+                                                                       
previousTimestamp,
+                                                                       
newVersion);
+
+                                                               // a new 
computation state is referring to the shared entry
+                                                               
sharedBuffer.lock(newState, event, timestamp);
+
+                                                               
resultingComputationStates.add(new ComputationState<T>(
+                                                                       
newState,
+                                                                       event,
+                                                                       
timestamp,
+                                                                       
newVersion,
+                                                                       
startTimestamp));
+                                                               break;
+                                               }
+                                       }
+                               } catch (Exception e) {
+                                       throw new RuntimeException("Failure 
happened in filter function.", e);
+                               }
+                       }
+               }
+
+               if (computationState.isStartState()) {
+                       // a computation state is always kept if it refers to a 
starting state because every
+                       // new element can start a new pattern
+                       resultingComputationStates.add(computationState);
+               } else {
+                       // release the shared entry referenced by the current 
computation state.
+                       sharedBuffer.release(computationState.getState(), 
computationState.getEvent(), computationState.getTimestamp());
+                       // try to remove unnecessary shared buffer entries
+                       sharedBuffer.remove(computationState.getState(), 
computationState.getEvent(), computationState.getTimestamp());
+               }
+
+               return resultingComputationStates;
+       }
+
+       /**
+        * Extracts all the sequences of events from the start to the given 
computation state. An event
+        * sequence is returned as a map which contains the events and the 
names of the states to which
+        * the events were mapped.
+        *
+        * @param computationState The end computation state of the extracted 
event sequences
+        * @return Collection of event sequences which end in the given 
computation state
+        */
+       private Collection<Map<String, T>> extractPatternMatches(final 
ComputationState<T> computationState) {
+               Collection<LinkedHashMultimap<State<T>, T>> paths = 
sharedBuffer.extractPatterns(
+                       computationState.getState(),
+                       computationState.getEvent(),
+                       computationState.getTimestamp(),
+                       computationState.getVersion());
+
+               ArrayList<Map<String, T>> result = new ArrayList<>();
+
+               TypeSerializer<T> serializer = 
nonDuplicatingTypeSerializer.getTypeSerializer();
+
+               // generate the correct names from the collection of 
LinkedHashMultimaps
+               for (LinkedHashMultimap<State<T>, T> path: paths) {
+                       Map<String, T> resultPath = new HashMap<>();
+                       for (State<T> key: path.keySet()) {
+                               int counter = 0;
+                               Set<T> events = path.get(key);
+
+                               // we iterate over the elements in insertion 
order
+                               for (T event: events) {
+                                       resultPath.put(
+                                               events.size() > 1 ? 
generateStateName(key.getName(), counter): key.getName(),
+                                               // copy the element so that the 
user can change it
+                                               serializer.isImmutableType() ? 
event : serializer.copy(event)
+                                       );
+                               }
+                       }
+
+                       result.add(resultPath);
+               }
+
+               return result;
+       }
+
+       private void writeObject(ObjectOutputStream oos) throws IOException {
+               oos.defaultWriteObject();
+
+               oos.writeInt(computationStates.size());
+
+               for(ComputationState<T> computationState: computationStates) {
+                       writeComputationState(computationState, oos);
+               }
+
+               nonDuplicatingTypeSerializer.clearReferences();
+       }
+
+       private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+               ois.defaultReadObject();
+
+               int numberComputationStates = ois.readInt();
+
+               computationStates = new LinkedList<>();
+
+               for (int i = 0; i < numberComputationStates; i++) {
+                       ComputationState<T> computationState = 
readComputationState(ois);
+
+                       computationStates.offer(computationState);
+               }
+
+               nonDuplicatingTypeSerializer.clearReferences();
+       }
+
+       private void writeComputationState(final ComputationState<T> 
computationState, final ObjectOutputStream oos) throws IOException {
+               oos.writeObject(computationState.getState());
+               oos.writeLong(computationState.getTimestamp());
+               oos.writeObject(computationState.getVersion());
+               oos.writeLong(computationState.getStartTimestamp());
+
+               DataOutputViewStreamWrapper output = new 
DataOutputViewStreamWrapper(oos);
+
+               
nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
+       }
+
+       @SuppressWarnings("unchecked")
+       private ComputationState<T> readComputationState(ObjectInputStream ois) 
throws IOException, ClassNotFoundException {
+               final State<T> state = (State<T>)ois.readObject();
+               final long timestamp = ois.readLong();
+               final DeweyNumber version = (DeweyNumber)ois.readObject();
+               final long startTimestamp = ois.readLong();
+
+               DataInputViewStreamWrapper input = new 
DataInputViewStreamWrapper(ois);
+               final T event = nonDuplicatingTypeSerializer.deserialize(input);
+
+               return new ComputationState<>(state, event, timestamp, version, 
startTimestamp);
+       }
+
+       /**
+        * Generates a state name from a given name template and an index.
+        * <p>
+        * If the template ends with "[]" the index is inserted in between the 
square brackets.
+        * Otherwise, an underscore and the index is appended to the name.
+        *
+        * @param name Name template
+        * @param index Index of the state
+        * @return Generated state name from the given state name template
+        */
+       static String generateStateName(final String name, final int index) {
+               Matcher matcher = namePattern.matcher(name);
+
+               if (matcher.matches()) {
+                       return matcher.group(1) + index + matcher.group(2);
+               } else {
+                       return name + "_" + index;
+               }
+       }
+}

Reply via email to